You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:33 UTC
[02/50] [abbrv] beam git commit: Improve HadoopInputFormatIO
DisplayData and Cassandra tests
Improve HadoopInputFormatIO DisplayData and Cassandra tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c08b7b17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c08b7b17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c08b7b17
Branch: refs/heads/DSL_SQL
Commit: c08b7b1771481b77c94ec78a96db5b34fec29841
Parents: 8beea73
Author: Dipti Kulkarni <di...@persistent.co.in>
Authored: Mon Apr 10 15:43:12 2017 +0530
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 10:20:15 2017 -0700
----------------------------------------------------------------------
.../hadoop/inputformat/HadoopInputFormatIO.java | 33 +--
.../inputformat/HadoopInputFormatIOTest.java | 99 ++++++---
sdks/java/io/hadoop/jdk1.8-tests/.toDelete | 0
.../HIFIOWithEmbeddedCassandraTest.java | 214 +++++++++++++++++++
4 files changed, 306 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 675f4bf..61dc1bf 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -36,9 +36,7 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -360,20 +358,6 @@ public class HadoopInputFormatIO {
+ e.getMessage(), e);
}
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- if (getConfiguration().getHadoopConfiguration() != null) {
- Iterator<Entry<String, String>> configProperties = getConfiguration()
- .getHadoopConfiguration().iterator();
- while (configProperties.hasNext()) {
- Entry<String, String> property = configProperties.next();
- builder.addIfNotNull(DisplayData.item(property.getKey(), property.getValue())
- .withLabel(property.getKey()));
- }
- }
- }
}
/**
@@ -447,6 +431,23 @@ public class HadoopInputFormatIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ Configuration hadoopConfig = getConfiguration().getHadoopConfiguration();
+ if (hadoopConfig != null) {
+ builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class",
+ hadoopConfig.get("mapreduce.job.inputformat.class"))
+ .withLabel("InputFormat Class"));
+ builder.addIfNotNull(DisplayData.item("key.class",
+ hadoopConfig.get("key.class"))
+ .withLabel("Key Class"));
+ builder.addIfNotNull(DisplayData.item("value.class",
+ hadoopConfig.get("value.class"))
+ .withLabel("Value Class"));
+ }
+ }
+
+ @Override
public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
PipelineOptions options) throws Exception {
// desiredBundleSizeBytes is not being considered as splitting based on this
http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 2f2857b..3a4a99d 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -21,9 +21,7 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -157,24 +155,6 @@ public class HadoopInputFormatIOTest {
}
/**
- * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData()
- * populateDisplayData()}.
- */
- @Test
- public void testReadDisplayData() {
- HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
- .withKeyTranslation(myKeyTranslate)
- .withValueTranslation(myValueTranslate);
- DisplayData displayData = DisplayData.from(read);
- Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator();
- while (propertyElement.hasNext()) {
- Entry<String, String> element = propertyElement.next();
- assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue()));
- }
- }
-
- /**
* This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
* null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}
* method checks configuration is null and throws exception if it is null.
@@ -415,6 +395,32 @@ public class HadoopInputFormatIOTest {
}
/**
+ * This test validates functionality of
+ * {@link HadoopInputFormatIO.HadoopInputFormatBoundedSource#populateDisplayData()
+ * populateDisplayData()}.
+ */
+ @Test
+ public void testReadDisplayData() {
+ HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+ new HadoopInputFormatBoundedSource<Text, Employee>(
+ serConf,
+ WritableCoder.of(Text.class),
+ AvroCoder.of(Employee.class),
+ null, // No key translation required.
+ null, // No value translation required.
+ new SerializableSplit());
+ DisplayData displayData = DisplayData.from(boundedSource);
+ assertThat(
+ displayData,
+ hasDisplayItem("mapreduce.job.inputformat.class",
+ serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")));
+ assertThat(displayData,
+ hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class")));
+ assertThat(displayData,
+ hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class")));
+ }
+
+ /**
* This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object
* creation fails.
*/
@@ -472,7 +478,8 @@ public class HadoopInputFormatIOTest {
*/
@Test
public void testReadersStartWhenZeroRecords() throws Exception {
- InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+
+ InputFormat mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
Mockito.when(
mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
@@ -487,9 +494,11 @@ public class HadoopInputFormatIOTest {
null, // No key translation required.
null, // No value translation required.
new SerializableSplit(mockInputSplit));
- BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions());
- assertEquals(false, boundedReader.start());
- assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed());
+ boundedSource.setInputFormatObj(mockInputFormat);
+ BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions());
+ assertEquals(false, reader.start());
+ assertEquals(Double.valueOf(1), reader.getFractionConsumed());
+ reader.close();
}
/**
@@ -546,6 +555,48 @@ public class HadoopInputFormatIOTest {
assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
}
+/**
+ * This test validates the method getFractionConsumed()- when a bad progress value is returned by
+ * the inputformat.
+ */
+ @Test
+ public void testGetFractionConsumedForBadProgressValue() throws Exception {
+ InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+ EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
+ Mockito.when(
+ mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
+ Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader);
+ Mockito.when(mockReader.nextKeyValue()).thenReturn(true);
+ // Set to a bad value , not in range of 0 to 1
+ Mockito.when(mockReader.getProgress()).thenReturn(2.0F);
+ InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
+ HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+ new HadoopInputFormatBoundedSource<Text, Employee>(
+ serConf,
+ WritableCoder.of(Text.class),
+ AvroCoder.of(Employee.class),
+ null, // No key translation required.
+ null, // No value translation required.
+ new SerializableSplit(mockInputSplit));
+ boundedSource.setInputFormatObj(mockInputFormat);
+ BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions());
+ assertEquals(Double.valueOf(0), reader.getFractionConsumed());
+ boolean start = reader.start();
+ assertEquals(true, start);
+ if (start) {
+ boolean advance = reader.advance();
+ assertEquals(null, reader.getFractionConsumed());
+ assertEquals(true, advance);
+ if (advance) {
+ advance = reader.advance();
+ assertEquals(null, reader.getFractionConsumed());
+ }
+ }
+ // Validate if getFractionConsumed() returns null after few number of reads as getProgress
+ // returns invalid value '2' which is not in the range of 0 to 1.
+ assertEquals(null, reader.getFractionConsumed());
+ reader.close();
+ }
/**
* This test validates that reader and its parent source reads the same records.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/.toDelete
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/.toDelete b/sdks/java/io/hadoop/jdk1.8-tests/.toDelete
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
new file mode 100644
index 0000000..97addcf
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests to validate HadoopInputFormatIO for embedded Cassandra instance.
+ */
+@RunWith(JUnit4.class)
+public class HIFIOWithEmbeddedCassandraTest implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final String CASSANDRA_KEYSPACE = "beamdb";
+ private static final String CASSANDRA_HOST = "127.0.0.1";
+ private static final String CASSANDRA_TABLE = "scientists";
+ private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port";
+ private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address";
+ private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY =
+ "cassandra.input.partitioner.class";
+ private static final String CASSANDRA_PARTITIONER_CLASS_VALUE = "Murmur3Partitioner";
+ private static final String CASSANDRA_KEYSPACE_PROPERTY = "cassandra.input.keyspace";
+ private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = "cassandra.input.columnfamily";
+ private static final String CASSANDRA_PORT = "9061";
+ private static transient Cluster cluster;
+ private static transient Session session;
+ private static final long TEST_DATA_ROW_COUNT = 10L;
+ private static EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ /**
+ * Test to read data from embedded Cassandra instance and verify whether data is read
+ * successfully.
+ * @throws Exception
+ */
+ @Test
+ public void testHIFReadForCassandra() throws Exception {
+ // Expected hashcode is evaluated during insertion time one time and hardcoded here.
+ String expectedHashCode = "4651110ba1ef2cd3a7315091ca27877b18fceb0e";
+ Configuration conf = getConfiguration();
+ PCollection<KV<Long, String>> cassandraData =
+ p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
+ .withValueTranslation(myValueTranslate));
+ // Verify the count of data retrieved from Cassandra matches expected count.
+ PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
+ .isEqualTo(TEST_DATA_ROW_COUNT);
+ PCollection<String> textValues = cassandraData.apply(Values.<String>create());
+ // Verify the output values using checksum comparison.
+ PCollection<String> consolidatedHashcode =
+ textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+ PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
+ p.run().waitUntilFinish();
+ }
+
+ SimpleFunction<Row, String> myValueTranslate = new SimpleFunction<Row, String>() {
+ @Override
+ public String apply(Row input) {
+ String scientistRecord = input.getInt("id") + "|" + input.getString("scientist");
+ return scientistRecord;
+ }
+ };
+
+ /**
+ * Test to read data from embedded Cassandra instance based on query and verify whether data is
+ * read successfully.
+ */
+ @Test
+ public void testHIFReadForCassandraQuery() throws Exception {
+ Long expectedCount = 1L;
+ String expectedChecksum = "6a62f24ccce0713004889aec1cf226949482d188";
+ Configuration conf = getConfiguration();
+ conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
+ + " where token(id) > ? and token(id) <= ? and scientist='Faraday1' allow filtering");
+ PCollection<KV<Long, String>> cassandraData =
+ p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
+ .withValueTranslation(myValueTranslate));
+ // Verify the count of data retrieved from Cassandra matches expected count.
+ PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
+ .isEqualTo(expectedCount);
+ PCollection<String> textValues = cassandraData.apply(Values.<String>create());
+ // Verify the output values using checksum comparison.
+ PCollection<String> consolidatedHashcode =
+ textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+ PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedChecksum);
+ p.run().waitUntilFinish();
+ }
+
+ /**
+ * Returns configuration of CqlInutFormat. Mandatory parameters required apart from inputformat
+ * class name, key class, value class are thrift port, thrift address, partitioner class, keyspace
+ * and columnfamily name
+ */
+ public Configuration getConfiguration() {
+ Configuration conf = new Configuration();
+ conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT);
+ conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST);
+ conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE);
+ conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE);
+ conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE);
+ conf.setClass("mapreduce.job.inputformat.class",
+ org.apache.cassandra.hadoop.cql3.CqlInputFormat.class, InputFormat.class);
+ conf.setClass("key.class", java.lang.Long.class, Object.class);
+ conf.setClass("value.class", com.datastax.driver.core.Row.class, Object.class);
+ return conf;
+ }
+
+ public static void createCassandraData() throws Exception {
+ session.execute("CREATE KEYSPACE " + CASSANDRA_KEYSPACE
+ + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};");
+ session.execute("USE " + CASSANDRA_KEYSPACE);
+ session.execute("CREATE TABLE " + CASSANDRA_TABLE
+ + "(id int, scientist text, PRIMARY KEY(id));");
+ for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
+ session.execute("INSERT INTO " + CASSANDRA_TABLE + "(id, scientist) values(" + i
+ + ", 'Faraday" + i + "');");
+ }
+ }
+
+ @BeforeClass
+ public static void startCassandra() throws Exception {
+ //Start the Embedded Cassandra Service
+ cassandra.start();
+ final SocketOptions socketOptions = new SocketOptions();
+ // Setting this to 0 disables read timeouts.
+ socketOptions.setReadTimeoutMillis(0);
+ // This defaults to 5 s. Increase to a minute.
+ socketOptions.setConnectTimeoutMillis(60 * 1000);
+ cluster =
+ Cluster.builder().addContactPoint(CASSANDRA_HOST).withClusterName("beam")
+ .withSocketOptions(socketOptions).build();
+ session = cluster.connect();
+ createCassandraData();
+ }
+
+ @AfterClass
+ public static void stopEmbeddedCassandra() throws Exception {
+ session.close();
+ cluster.close();
+ }
+
+ /**
+ * POJO class for scientist data.
+ */
+ @Table(name = CASSANDRA_TABLE, keyspace = CASSANDRA_KEYSPACE)
+ public static class Scientist implements Serializable {
+ private static final long serialVersionUID = 1L;
+ @Column(name = "scientist")
+ private String name;
+ @Column(name = "id")
+ private int id;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return id + ":" + name;
+ }
+ }
+}