You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:33 UTC

[02/50] [abbrv] beam git commit: Improve HadoopInputFormatIO DisplayData and Cassandra tests

Improve HadoopInputFormatIO DisplayData and Cassandra tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c08b7b17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c08b7b17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c08b7b17

Branch: refs/heads/DSL_SQL
Commit: c08b7b1771481b77c94ec78a96db5b34fec29841
Parents: 8beea73
Author: Dipti Kulkarni <di...@persistent.co.in>
Authored: Mon Apr 10 15:43:12 2017 +0530
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 10:20:15 2017 -0700

----------------------------------------------------------------------
 .../hadoop/inputformat/HadoopInputFormatIO.java |  33 +--
 .../inputformat/HadoopInputFormatIOTest.java    |  99 ++++++---
 sdks/java/io/hadoop/jdk1.8-tests/.toDelete      |   0
 .../HIFIOWithEmbeddedCassandraTest.java         | 214 +++++++++++++++++++
 4 files changed, 306 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 675f4bf..61dc1bf 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -36,9 +36,7 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -360,20 +358,6 @@ public class HadoopInputFormatIO {
             + e.getMessage(), e);
       }
     }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      if (getConfiguration().getHadoopConfiguration() != null) {
-        Iterator<Entry<String, String>> configProperties = getConfiguration()
-            .getHadoopConfiguration().iterator();
-        while (configProperties.hasNext()) {
-          Entry<String, String> property = configProperties.next();
-          builder.addIfNotNull(DisplayData.item(property.getKey(), property.getValue())
-              .withLabel(property.getKey()));
-        }
-      }
-    }
   }
 
   /**
@@ -447,6 +431,23 @@ public class HadoopInputFormatIO {
     }
 
     @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      Configuration hadoopConfig = getConfiguration().getHadoopConfiguration();
+      if (hadoopConfig != null) {
+        builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class",
+            hadoopConfig.get("mapreduce.job.inputformat.class"))
+            .withLabel("InputFormat Class"));
+        builder.addIfNotNull(DisplayData.item("key.class",
+            hadoopConfig.get("key.class"))
+            .withLabel("Key Class"));
+        builder.addIfNotNull(DisplayData.item("value.class",
+            hadoopConfig.get("value.class"))
+            .withLabel("Value Class"));
+      }
+    }
+
+    @Override
     public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
         PipelineOptions options) throws Exception {
       // desiredBundleSizeBytes is not being considered as splitting based on this

http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 2f2857b..3a4a99d 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -21,9 +21,7 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
 
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -157,24 +155,6 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData()
-   * populateDisplayData()}.
-   */
-  @Test
-  public void testReadDisplayData() {
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate)
-        .withValueTranslation(myValueTranslate);
-    DisplayData displayData = DisplayData.from(read);
-    Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator();
-    while (propertyElement.hasNext()) {
-      Entry<String, String> element = propertyElement.next();
-      assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue()));
-    }
-  }
-
-  /**
    * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
    * null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}
    * method checks configuration is null and throws exception if it is null.
@@ -415,6 +395,32 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
+   * This test validates functionality of
+   * {@link HadoopInputFormatIO.HadoopInputFormatBoundedSource#populateDisplayData()
+   * populateDisplayData()}.
+   */
+  @Test
+  public void testReadDisplayData() {
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit());
+    DisplayData displayData = DisplayData.from(boundedSource);
+    assertThat(
+        displayData,
+        hasDisplayItem("mapreduce.job.inputformat.class",
+            serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")));
+    assertThat(displayData,
+        hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class")));
+    assertThat(displayData,
+        hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class")));
+  }
+
+  /**
    * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object
    * creation fails.
    */
@@ -472,7 +478,8 @@ public class HadoopInputFormatIOTest {
    */
   @Test
   public void testReadersStartWhenZeroRecords() throws Exception {
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+
+    InputFormat mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
     EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
     Mockito.when(
         mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
@@ -487,9 +494,11 @@ public class HadoopInputFormatIOTest {
             null, // No key translation required.
             null, // No value translation required.
             new SerializableSplit(mockInputSplit));
-    BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions());
-    assertEquals(false, boundedReader.start());
-    assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed());
+    boundedSource.setInputFormatObj(mockInputFormat);
+    BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions());
+    assertEquals(false, reader.start());
+    assertEquals(Double.valueOf(1), reader.getFractionConsumed());
+    reader.close();
   }
 
   /**
@@ -546,6 +555,48 @@ public class HadoopInputFormatIOTest {
     assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
   }
 
+/**
+   * This test validates the method getFractionConsumed()- when a bad progress value is returned by
+   * the inputformat.
+   */
+  @Test
+  public void testGetFractionConsumedForBadProgressValue() throws Exception {
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
+    Mockito.when(
+        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
+            Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader);
+    Mockito.when(mockReader.nextKeyValue()).thenReturn(true);
+    // Set to a bad value , not in range of 0 to 1
+    Mockito.when(mockReader.getProgress()).thenReturn(2.0F);
+    InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit(mockInputSplit));
+    boundedSource.setInputFormatObj(mockInputFormat);
+    BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions());
+    assertEquals(Double.valueOf(0), reader.getFractionConsumed());
+    boolean start = reader.start();
+    assertEquals(true, start);
+    if (start) {
+      boolean advance = reader.advance();
+      assertEquals(null, reader.getFractionConsumed());
+      assertEquals(true, advance);
+      if (advance) {
+        advance = reader.advance();
+        assertEquals(null, reader.getFractionConsumed());
+      }
+    }
+    // Validate if getFractionConsumed() returns null after few number of reads as getProgress
+    // returns invalid value '2' which is not in the range of 0 to 1.
+    assertEquals(null, reader.getFractionConsumed());
+    reader.close();
+  }
   /**
    * This test validates that reader and its parent source reads the same records.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/.toDelete
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/.toDelete b/sdks/java/io/hadoop/jdk1.8-tests/.toDelete
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
new file mode 100644
index 0000000..97addcf
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests to validate HadoopInputFormatIO for embedded Cassandra instance.
+ */
+@RunWith(JUnit4.class)
+public class HIFIOWithEmbeddedCassandraTest implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private static final String CASSANDRA_KEYSPACE = "beamdb";
+  private static final String CASSANDRA_HOST = "127.0.0.1";
+  private static final String CASSANDRA_TABLE = "scientists";
+  private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port";
+  private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address";
+  private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY =
+      "cassandra.input.partitioner.class";
+  private static final String CASSANDRA_PARTITIONER_CLASS_VALUE = "Murmur3Partitioner";
+  private static final String CASSANDRA_KEYSPACE_PROPERTY = "cassandra.input.keyspace";
+  private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = "cassandra.input.columnfamily";
+  private static final String CASSANDRA_PORT = "9061";
+  private static transient Cluster cluster;
+  private static transient Session session;
+  private static final long TEST_DATA_ROW_COUNT = 10L;
+  private static EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  /**
+   * Test to read data from embedded Cassandra instance and verify whether data is read
+   * successfully.
+   * @throws Exception
+   */
+  @Test
+  public void testHIFReadForCassandra() throws Exception {
+    // Expected hashcode is evaluated during insertion time one time and hardcoded here.
+    String expectedHashCode = "4651110ba1ef2cd3a7315091ca27877b18fceb0e";
+    Configuration conf = getConfiguration();
+    PCollection<KV<Long, String>> cassandraData =
+        p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
+            .withValueTranslation(myValueTranslate));
+    // Verify the count of data retrieved from Cassandra matches expected count.
+    PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
+        .isEqualTo(TEST_DATA_ROW_COUNT);
+    PCollection<String> textValues = cassandraData.apply(Values.<String>create());
+    // Verify the output values using checksum comparison.
+    PCollection<String> consolidatedHashcode =
+        textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+    PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
+    p.run().waitUntilFinish();
+  }
+
+  SimpleFunction<Row, String> myValueTranslate = new SimpleFunction<Row, String>() {
+    @Override
+    public String apply(Row input) {
+      String scientistRecord = input.getInt("id") + "|" + input.getString("scientist");
+      return scientistRecord;
+    }
+  };
+
+  /**
+   * Test to read data from embedded Cassandra instance based on query and verify whether data is
+   * read successfully.
+   */
+  @Test
+  public void testHIFReadForCassandraQuery() throws Exception {
+    Long expectedCount = 1L;
+    String expectedChecksum = "6a62f24ccce0713004889aec1cf226949482d188";
+    Configuration conf = getConfiguration();
+    conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
+        + " where token(id) > ? and token(id) <= ? and scientist='Faraday1' allow filtering");
+    PCollection<KV<Long, String>> cassandraData =
+        p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
+            .withValueTranslation(myValueTranslate));
+    // Verify the count of data retrieved from Cassandra matches expected count.
+    PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
+        .isEqualTo(expectedCount);
+    PCollection<String> textValues = cassandraData.apply(Values.<String>create());
+    // Verify the output values using checksum comparison.
+    PCollection<String> consolidatedHashcode =
+        textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+    PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedChecksum);
+    p.run().waitUntilFinish();
+  }
+
+  /**
+   * Returns configuration of CqlInutFormat. Mandatory parameters required apart from inputformat
+   * class name, key class, value class are thrift port, thrift address, partitioner class, keyspace
+   * and columnfamily name
+   */
+  public Configuration getConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT);
+    conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST);
+    conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE);
+    conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE);
+    conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE);
+    conf.setClass("mapreduce.job.inputformat.class",
+        org.apache.cassandra.hadoop.cql3.CqlInputFormat.class, InputFormat.class);
+    conf.setClass("key.class", java.lang.Long.class, Object.class);
+    conf.setClass("value.class", com.datastax.driver.core.Row.class, Object.class);
+    return conf;
+  }
+
+  public static void createCassandraData() throws Exception {
+    session.execute("CREATE KEYSPACE " + CASSANDRA_KEYSPACE
+        + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};");
+    session.execute("USE " + CASSANDRA_KEYSPACE);
+    session.execute("CREATE TABLE " + CASSANDRA_TABLE
+        + "(id int, scientist text, PRIMARY KEY(id));");
+    for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
+      session.execute("INSERT INTO " + CASSANDRA_TABLE + "(id, scientist) values(" + i
+          + ", 'Faraday" + i + "');");
+    }
+  }
+
+  @BeforeClass
+  public static void startCassandra() throws Exception {
+    //Start the Embedded Cassandra Service
+    cassandra.start();
+    final SocketOptions socketOptions = new SocketOptions();
+    // Setting this to 0 disables read timeouts.
+    socketOptions.setReadTimeoutMillis(0);
+    // This defaults to 5 s.  Increase to a minute.
+    socketOptions.setConnectTimeoutMillis(60 * 1000);
+    cluster =
+        Cluster.builder().addContactPoint(CASSANDRA_HOST).withClusterName("beam")
+            .withSocketOptions(socketOptions).build();
+    session = cluster.connect();
+    createCassandraData();
+  }
+
+  @AfterClass
+  public static void stopEmbeddedCassandra() throws Exception {
+    session.close();
+    cluster.close();
+  }
+
+  /**
+   * POJO class for scientist data.
+   */
+  @Table(name = CASSANDRA_TABLE, keyspace = CASSANDRA_KEYSPACE)
+  public static class Scientist implements Serializable {
+    private static final long serialVersionUID = 1L;
+    @Column(name = "scientist")
+    private String name;
+    @Column(name = "id")
+    private int id;
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public String toString() {
+      return id + ":" + name;
+    }
+  }
+}