You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/04/06 14:33:22 UTC

[3/7] beam git commit: HadoopInputFormatIO with junits

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
new file mode 100644
index 0000000..2f2857b
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader;
+import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit;
+import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
+import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link HadoopInputFormatIO}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopInputFormatIOTest {
+  static SerializableConfiguration serConf;
+  static SimpleFunction<Text, String> myKeyTranslate;
+  static SimpleFunction<Employee, String> myValueTranslate;
+
+  @Rule public final transient TestPipeline p = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private PBegin input = PBegin.in(p);
+
+  @BeforeClass
+  public static void setUp() throws IOException, InterruptedException {
+    serConf = loadTestConfiguration(
+                  EmployeeInputFormat.class,
+                  Text.class,
+                  Employee.class);
+    myKeyTranslate = new SimpleFunction<Text, String>() {
+      @Override
+      public String apply(Text input) {
+        return input.toString();
+      }
+    };
+    myValueTranslate = new SimpleFunction<Employee, String>() {
+      @Override
+      public String apply(Employee input) {
+        return input.getEmpName() + "_" + input.getEmpAddress();
+      }
+    };
+  }
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslate)
+        .withValueTranslation(myValueTranslate);
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
+    assertEquals(myValueTranslate, read.getValueTranslationFunction());
+    assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
+    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} builds correctly in different order
+   * of with configuration/key translation/value translation. This test also validates output
+   * PCollection key/value classes are set correctly even if Hadoop configuration is set after
+   * setting key/value translation.
+   */
+  @Test
+  public void testReadBuildsCorrectlyInDifferentOrder() {
+    HadoopInputFormatIO.Read<String, String> read =
+        HadoopInputFormatIO.<String, String>read()
+            .withValueTranslation(myValueTranslate)
+            .withConfiguration(serConf.getHadoopConfiguration())
+            .withKeyTranslation(myKeyTranslate);
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
+    assertEquals(myValueTranslate, read.getValueTranslationFunction());
+    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
+    assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} object creation if
+   * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()} is called more than
+   * once.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime()
+      throws IOException, InterruptedException {
+    SerializableConfiguration diffConf =
+        loadTestConfiguration(
+            EmployeeInputFormat.class,
+            Employee.class,
+            Text.class);
+    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslate)
+        .withConfiguration(diffConf.getHadoopConfiguration());
+    assertEquals(diffConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
+    assertEquals(null, read.getValueTranslationFunction());
+    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
+    assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+        .getValueTypeDescriptor().getRawType());
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData()
+   * populateDisplayData()}.
+   */
+  @Test
+  public void testReadDisplayData() {
+    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslate)
+        .withValueTranslation(myValueTranslate);
+    DisplayData displayData = DisplayData.from(read);
+    Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator();
+    while (propertyElement.hasNext()) {
+      Entry<String, String> element = propertyElement.next();
+      assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue()));
+    }
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
+   * null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}
+   * method checks configuration is null and throws exception if it is null.
+   */
+  @Test
+  public void testReadObjectCreationFailsIfConfigurationIsNull() {
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<Text, Employee>read()
+          .withConfiguration(null);
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with only
+   * configuration.
+   */
+  @Test
+  public void testReadObjectCreationWithConfiguration() {
+    HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
+        .withConfiguration(serConf.getHadoopConfiguration());
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(null, read.getKeyTranslationFunction());
+    assertEquals(null, read.getValueTranslationFunction());
+    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read
+        .getKeyTypeDescriptor().getRawType());
+    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+        .getValueTypeDescriptor().getRawType());
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
+   * configuration and null key translation. {@link HadoopInputFormatIO.Read#withKeyTranslation()
+   * withKeyTranslation()} checks keyTranslation is null and throws exception if it null value is
+   * passed.
+   */
+  @Test
+  public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<String, Employee>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(null);
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
+   * configuration and key translation.
+   */
+  @Test
+  public void testReadObjectCreationWithConfigurationKeyTranslation() {
+    HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslate);
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
+    assertEquals(null, read.getValueTranslationFunction());
+    assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
+        read.getKeyTypeDescriptor().getRawType());
+    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
+        read.getValueTypeDescriptor().getRawType());
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
+   * configuration and null value translation.
+   * {@link HadoopInputFormatIO.Read#withValueTranslation() withValueTranslation()} checks
+   * valueTranslation is null and throws exception if null value is passed.
+   */
+  @Test
+  public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<Text, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withValueTranslation(null);
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
+   * configuration and value translation.
+   */
+  @Test
+  public void testReadObjectCreationWithConfigurationValueTranslation() {
+    HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withValueTranslation(myValueTranslate);
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(null, read.getKeyTranslationFunction());
+    assertEquals(myValueTranslate, read.getValueTranslationFunction());
+    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class),
+        read.getKeyTypeDescriptor().getRawType());
+    assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
+        read.getValueTypeDescriptor().getRawType());
+  }
+
+  /**
+   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
+   * configuration, key translation and value translation.
+   */
+  @Test
+  public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() {
+    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslate)
+        .withValueTranslation(myValueTranslate);
+    assertEquals(serConf.getHadoopConfiguration(),
+        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
+    assertEquals(myValueTranslate, read.getValueTranslationFunction());
+    assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
+        read.getKeyTypeDescriptor().getRawType());
+    assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
+        read.getValueTypeDescriptor().getRawType());
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
+   * Read.validate()} function when Read transform is created without calling
+   * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}.
+   */
+  @Test
+  public void testReadValidationFailsMissingConfiguration() {
+    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read();
+    thrown.expect(NullPointerException.class);
+    read.validate(input);
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
+   * withConfiguration()} function when Hadoop InputFormat class is not provided by the user in
+   * configuration.
+   */
+  @Test
+  public void testReadValidationFailsMissingInputFormatInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass("key.class", Text.class, Object.class);
+    configuration.setClass("value.class", Employee.class, Object.class);
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<Text, Employee>read()
+        .withConfiguration(configuration);
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
+   * withConfiguration()} function when key class is not provided by the user in configuration.
+   */
+  @Test
+  public void testReadValidationFailsMissingKeyClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
+        InputFormat.class);
+    configuration.setClass("value.class", Employee.class, Object.class);
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<Text, Employee>read()
+        .withConfiguration(configuration);
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
+   * withConfiguration()} function when value class is not provided by the user in configuration.
+   */
+  @Test
+  public void testReadValidationFailsMissingValueClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
+        InputFormat.class);
+    configuration.setClass("key.class", Text.class, Object.class);
+    thrown.expect(NullPointerException.class);
+    HadoopInputFormatIO.<Text, Employee>read().withConfiguration(configuration);
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
+   * Read.validate()} function when myKeyTranslate's (simple function provided by user for key
+   * translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set in
+   * configuration as "key.class").
+   */
+  @Test
+  public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
+    SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType =
+        new SimpleFunction<LongWritable, String>() {
+          @Override
+          public String apply(LongWritable input) {
+            return input.toString();
+          }
+        };
+    HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
+        .withConfiguration(serConf.getHadoopConfiguration())
+        .withKeyTranslation(myKeyTranslateWithWrongInputType);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format(
+        "Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s",
+        serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
+            InputFormat.class), serConf.getHadoopConfiguration()
+            .getClass("key.class", Object.class)));
+    read.validate(input);
+  }
+
+  /**
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
+   * Read.validate()} function when myValueTranslate's (simple function provided by user for value
+   * translation) input type is not same as Hadoop InputFormat's valueClass(Which is property set in
+   * configuration as "value.class").
+   */
+  @Test
+  public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() {
+    SimpleFunction<LongWritable, String> myValueTranslateWithWrongInputType =
+        new SimpleFunction<LongWritable, String>() {
+          @Override
+          public String apply(LongWritable input) {
+            return input.toString();
+          }
+        };
+    HadoopInputFormatIO.Read<Text, String> read =
+        HadoopInputFormatIO.<Text, String>read()
+            .withConfiguration(serConf.getHadoopConfiguration())
+            .withValueTranslation(myValueTranslateWithWrongInputType);
+    String expectedMessage =
+        String.format(
+            "Value translation's input type is not same as hadoop InputFormat :  "
+                + "%s value class : %s",
+            serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
+                InputFormat.class),
+            serConf.getHadoopConfiguration().getClass("value.class", Object.class));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(expectedMessage);
+    read.validate(input);
+  }
+
+  @Test
+  public void testReadingData() throws Exception {
+    HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
+        .withConfiguration(serConf.getHadoopConfiguration());
+    List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
+    PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read);
+    PAssert.that(actual).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  /**
+   * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object
+   * creation fails.
+   */
+  @Test
+  public void testReadIfCreateRecordReaderFails() throws Exception {
+    thrown.expect(Exception.class);
+    thrown.expectMessage("Exception in creating RecordReader");
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    Mockito.when(
+        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
+            Mockito.any(TaskAttemptContext.class))).thenThrow(
+        new IOException("Exception in creating RecordReader"));
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit());
+    boundedSource.setInputFormatObj(mockInputFormat);
+    SourceTestUtils.readFromSource(boundedSource, p.getOptions());
+  }
+
+  /**
+   * This test validates behavior of HadoopInputFormatSource if
+   * {@link InputFormat#createRecordReader() createRecordReader()} of InputFormat returns null.
+   */
+  @Test
+  public void testReadWithNullCreateRecordReader() throws Exception {
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    thrown.expect(IOException.class);
+    thrown.expectMessage(String.format("Null RecordReader object returned by %s",
+            mockInputFormat.getClass()));
+    Mockito.when(
+        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
+            Mockito.any(TaskAttemptContext.class))).thenReturn(null);
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit());
+    boundedSource.setInputFormatObj(mockInputFormat);
+    SourceTestUtils.readFromSource(boundedSource, p.getOptions());
+  }
+
+  /**
+   * This test validates behavior of
+   * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#start() start()} method if
+   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplitList having zero
+   * records.
+   */
+  @Test
+  public void testReadersStartWhenZeroRecords() throws Exception {
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
+    Mockito.when(
+        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
+            Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader);
+    Mockito.when(mockReader.nextKeyValue()).thenReturn(false);
+    InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit(mockInputSplit));
+    BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions());
+    assertEquals(false, boundedReader.start());
+    assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed());
+  }
+
+  /**
+   * This test validates the method getFractionConsumed()- which indicates the progress of the read
+   * in range of 0 to 1.
+   */
+  @Test
+  public void testReadersGetFractionConsumed() throws Exception {
+    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
+    HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
+        EmployeeInputFormat.class,
+        Text.class,
+        Employee.class,
+        WritableCoder.of(Text.class),
+        AvroCoder.of(Employee.class));
+    long estimatedSize = hifSource.getEstimatedSizeBytes(p.getOptions());
+    // Validate if estimated size is equal to the size of records.
+    assertEquals(referenceRecords.size(), estimatedSize);
+    List<BoundedSource<KV<Text, Employee>>> boundedSourceList =
+        hifSource.splitIntoBundles(0, p.getOptions());
+    // Validate if splitIntoBundles() has split correctly.
+    assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size());
+    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
+    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
+      List<KV<Text, Employee>> elements = new ArrayList<KV<Text, Employee>>();
+      BoundedReader<KV<Text, Employee>> reader = source.createReader(p.getOptions());
+      float recordsRead = 0;
+      // When start is not called, getFractionConsumed() should return 0.
+      assertEquals(Double.valueOf(0), reader.getFractionConsumed());
+      boolean start = reader.start();
+      assertEquals(true, start);
+      if (start) {
+        elements.add(reader.getCurrent());
+        boolean advance = reader.advance();
+        // Validate if getFractionConsumed() returns the correct fraction based on
+        // the number of records read in the split.
+        assertEquals(
+            Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
+            reader.getFractionConsumed());
+        assertEquals(true, advance);
+        while (advance) {
+          elements.add(reader.getCurrent());
+          advance = reader.advance();
+          assertEquals(
+              Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
+              reader.getFractionConsumed());
+        }
+        bundleRecords.addAll(elements);
+      }
+      // Validate if getFractionConsumed() returns 1 after reading is complete.
+      assertEquals(Double.valueOf(1), reader.getFractionConsumed());
+      reader.close();
+    }
+    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
+  }
+
+  /**
+   * This test validates that reader and its parent source reads the same records.
+   */
+  @Test
+  public void testReaderAndParentSourceReadsSameData() throws Exception {
+    InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
+    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit(mockInputSplit));
+    BoundedReader<KV<Text, Employee>> reader = boundedSource
+        .createReader(p.getOptions());
+    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(reader, p.getOptions());
+  }
+
+  /**
+   * This test verifies that the method
+   * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#getCurrentSource()
+   * getCurrentSource()} returns correct source object.
+   */
+  @Test
+  public void testGetCurrentSourceFunction() throws Exception {
+    SerializableSplit split = new SerializableSplit();
+    BoundedSource<KV<Text, Employee>> source =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            split);
+    BoundedReader<KV<Text, Employee>> hifReader = source.createReader(p.getOptions());
+    BoundedSource<KV<Text, Employee>> hifSource = hifReader.getCurrentSource();
+    assertEquals(hifSource, source);
+  }
+
+  /**
+   * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader()
+   * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles()
+   * splitIntoBundles()} is not called.
+   */
+  @Test
+  public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception {
+    HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
+        EmployeeInputFormat.class,
+        Text.class,
+        Employee.class,
+        WritableCoder.of(Text.class),
+        AvroCoder.of(Employee.class));
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Cannot create reader as source is not split yet.");
+    hifSource.createReader(p.getOptions());
+  }
+
+  /**
+   * This test validates behavior of
+   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
+   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list.
+   */
+  @Test
+  public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception {
+    InputFormat<?, ?> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
+    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(
+        new ArrayList<InputSplit>());
+    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            mockInputSplit);
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Error in computing splits, getSplits() returns a empty list");
+    hifSource.setInputFormatObj(mockInputFormat);
+    hifSource.computeSplitsIfNecessary();
+  }
+
+  /**
+   * This test validates behavior of
+   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
+   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns NULL value.
+   */
+  @Test
+  public void testComputeSplitsIfGetSplitsReturnsNullValue() throws Exception {
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
+    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(null);
+    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            mockInputSplit);
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Error in computing splits, getSplits() returns null.");
+    hifSource.setInputFormatObj(mockInputFormat);
+    hifSource.computeSplitsIfNecessary();
+  }
+
+  /**
+   * This test validates behavior of
+   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} if Hadoop
+   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplit list having some
+   * null values.
+   */
+  @Test
+  public void testComputeSplitsIfGetSplitsReturnsListHavingNullValues() throws Exception {
+    // InputSplit list having null value.
+    InputSplit mockInputSplit =
+        Mockito.mock(InputSplit.class, Mockito.withSettings().extraInterfaces(Writable.class));
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    inputSplitList.add(mockInputSplit);
+    inputSplitList.add(null);
+    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
+    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(
+        inputSplitList);
+    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
+        new HadoopInputFormatBoundedSource<Text, Employee>(
+            serConf,
+            WritableCoder.of(Text.class),
+            AvroCoder.of(Employee.class),
+            null, // No key translation required.
+            null, // No value translation required.
+            new SerializableSplit());
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Error in computing splits, split is null in InputSplits list populated "
+        + "by getSplits() : ");
+    hifSource.setInputFormatObj(mockInputFormat);
+    hifSource.computeSplitsIfNecessary();
+  }
+
+  /**
+   * This test validates records emitted in PCollection are immutable if InputFormat's recordReader
+   * returns same objects(i.e. same locations in memory) but with updated values for each record.
+   */
+  @Test
+  public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreMutable() throws Exception {
+    List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
+       ReuseObjectsEmployeeInputFormat.class,
+       Text.class,
+       Employee.class,
+       WritableCoder.of(Text.class),
+       AvroCoder.of(Employee.class));
+    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
+    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
+      List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions());
+      bundleRecords.addAll(elems);
+    }
+    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
+    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
+  }
+
+  /**
+   * Test reading if InputFormat implements {@link org.apache.hadoop.conf.Configurable
+   * Configurable}.
+   */
+  @Test
+  public void testReadingWithConfigurableInputFormat() throws Exception {
+    List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
+        ConfigurableEmployeeInputFormat.class,
+        Text.class,
+        Employee.class,
+        WritableCoder.of(Text.class),
+        AvroCoder.of(Employee.class));
+    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
+      // Cast to HadoopInputFormatBoundedSource to access getInputFormat().
+      @SuppressWarnings("unchecked")
+      HadoopInputFormatBoundedSource<Text, Employee> hifSource =
+          (HadoopInputFormatBoundedSource<Text, Employee>) source;
+      hifSource.createInputFormatInstance();
+      ConfigurableEmployeeInputFormat inputFormatObj =
+          (ConfigurableEmployeeInputFormat) hifSource.getInputFormat();
+      assertEquals(true, inputFormatObj.isConfSet);
+    }
+  }
+
+  /**
+   * This test validates records emitted in PCollection are immutable if InputFormat's
+   * {@link org.apache.hadoop.mapreduce.RecordReader RecordReader} returns different objects (i.e.
+   * different locations in memory).
+   */
+  @Test
+  public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreImmutable() throws Exception {
+   List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
+       EmployeeInputFormat.class,
+       Text.class,
+       Employee.class,
+       WritableCoder.of(Text.class),
+       AvroCoder.of(Employee.class));
+    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
+    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
+      List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions());
+      bundleRecords.addAll(elems);
+    }
+    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
+    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
+  }
+
+  private static SerializableConfiguration loadTestConfiguration(Class<?> inputFormatClassName,
+      Class<?> keyClass, Class<?> valueClass) {
+    Configuration conf = new Configuration();
+    conf.setClass("mapreduce.job.inputformat.class", inputFormatClassName, InputFormat.class);
+    conf.setClass("key.class", keyClass, Object.class);
+    conf.setClass("value.class", valueClass, Object.class);
+    return new SerializableConfiguration(conf);
+  }
+
+  private <K, V> HadoopInputFormatBoundedSource<K, V> getTestHIFSource(
+      Class<?> inputFormatClass,
+      Class<K> inputFormatKeyClass,
+      Class<V> inputFormatValueClass,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder){
+    SerializableConfiguration serConf =
+        loadTestConfiguration(
+            inputFormatClass,
+            inputFormatKeyClass,
+            inputFormatValueClass);
+    return new HadoopInputFormatBoundedSource<K, V>(
+            serConf,
+            keyCoder,
+            valueCoder,
+            null, // No key translation required.
+            null); // No value translation required.
+  }
+
+  private <K, V> List<BoundedSource<KV<K, V>>> getBoundedSourceList(
+      Class<?> inputFormatClass,
+      Class<K> inputFormatKeyClass,
+      Class<V> inputFormatValueClass,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) throws Exception{
+    HadoopInputFormatBoundedSource<K, V> boundedSource = getTestHIFSource(
+        inputFormatClass,
+        inputFormatKeyClass,
+        inputFormatValueClass,
+        keyCoder,
+        valueCoder);
+    return boundedSource.splitIntoBundles(0, p.getOptions());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
new file mode 100644
index 0000000..fbe74ec
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a valid InputFormat for reading employee data which is available in the form of
+ * {@code List<KV>} as {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList
+ * employeeDataList}. {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList
+ * employeeDataList} is populated using {@linkplain TestEmployeeDataSet#populateEmployeeDataNew()}.
+ *
+ * <p>{@linkplain ReuseObjectsEmployeeInputFormat} splits data into
+ * {@value TestEmployeeDataSet#NUMBER_OF_SPLITS} splits, each split having
+ * {@value TestEmployeeDataSet#NUMBER_OF_RECORDS_IN_EACH_SPLIT} records each.
+ * {@linkplain ReuseObjectsEmployeeInputFormat} reads data from
+ * {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList} and produces a
+ * key (employee id) of type Text and value of type {@linkplain Employee Employee}.
+ *
+ * <p>{@linkplain ReuseObjectsEmployeeInputFormat} is also input to test whether
+ * {@linkplain HadoopInputFormatIO } source returns immutable records for a scenario when
+ * RecordReader returns the same key and value objects with updating values every time it reads
+ * data.
+ */
+public class ReuseObjectsEmployeeInputFormat extends InputFormat<Text, Employee> {
+
+  public ReuseObjectsEmployeeInputFormat() {}
+
+  @Override
+  public RecordReader<Text, Employee> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new ReuseObjectsEmployeeRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException {
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) {
+      InputSplit inputSplitObj = new ReuseEmployeeInputSplit(
+          ((i - 1) * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
+          (i * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT - 1));
+      inputSplitList.add(inputSplitObj);
+    }
+    return inputSplitList;
+  }
+
+  /**
+   * InputSplit implementation for ReuseObjectsEmployeeInputFormat.
+   */
+  public class ReuseEmployeeInputSplit extends InputSplit implements Writable {
+    // Start and end map index of each split of employeeData.
+    private long startIndex;
+    private long endIndex;
+
+    public ReuseEmployeeInputSplit() {}
+
+    public ReuseEmployeeInputSplit(long startIndex, long endIndex) {
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
+
+    /** Returns number of records in each split. */
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return this.endIndex - this.startIndex + 1;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return null;
+    }
+
+
+    public long getStartIndex() {
+      return startIndex;
+    }
+
+    public long getEndIndex() {
+      return endIndex;
+    }
+
+    @Override
+    public void readFields(DataInput dataIn) throws IOException {
+      startIndex = dataIn.readLong();
+      endIndex = dataIn.readLong();
+    }
+
+    @Override
+    public void write(DataOutput dataOut) throws IOException {
+      dataOut.writeLong(startIndex);
+      dataOut.writeLong(endIndex);
+    }
+  }
+
+  /**
+   * RecordReader for ReuseObjectsEmployeeInputFormat.
+   */
+  public class ReuseObjectsEmployeeRecordReader extends RecordReader<Text, Employee> {
+
+    private ReuseEmployeeInputSplit split;
+    private Text currentKey = new Text();
+    private Employee currentValue = new Employee();
+    private long employeeListIndex = 0L;
+    private long recordsRead = 0L;
+    private List<KV<String, String>> employeeDataList;
+
+    public ReuseObjectsEmployeeRecordReader() {}
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public Text getCurrentKey() throws IOException, InterruptedException {
+      return currentKey;
+    }
+
+    @Override
+    public Employee getCurrentValue() throws IOException, InterruptedException {
+      return currentValue;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return (float) recordsRead / split.getLength();
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext arg1)
+        throws IOException, InterruptedException {
+      this.split = (ReuseEmployeeInputSplit) split;
+      employeeListIndex = this.split.getStartIndex() - 1;
+      recordsRead = 0;
+      employeeDataList = TestEmployeeDataSet.populateEmployeeData();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if ((recordsRead++) >= split.getLength()) {
+        return false;
+      }
+      employeeListIndex++;
+      KV<String, String> employeeDetails = employeeDataList.get((int) employeeListIndex);
+      String empData[] = employeeDetails.getValue().split("_");
+      // Updating the same key and value objects with new employee data.
+      currentKey.set(employeeDetails.getKey());
+      currentValue.setEmpName(empData[0]);
+      currentValue.setEmpAddress(empData[1]);
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
new file mode 100644
index 0000000..4a8fe95
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.Text;
+/**
+ * Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for
+ * computing splits.
+ */
+public class TestEmployeeDataSet {
+  public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
+  public static final long NUMBER_OF_SPLITS = 3L;
+  private static final List<KV<String, String>> data = new ArrayList<KV<String, String>>();
+
+  /**
+   * Returns List of employee details. Employee details are available in the form of {@link KV} in
+   * which, key indicates employee id and value indicates employee details such as name and address
+   * separated by '_'. This is data input to {@link EmployeeInputFormat} and
+   * {@link ReuseObjectsEmployeeInputFormat}.
+   */
+  public static List<KV<String, String>> populateEmployeeData() {
+    if (!data.isEmpty()) {
+      return data;
+    }
+    data.add(KV.of("0", "Alex_US"));
+    data.add(KV.of("1", "John_UK"));
+    data.add(KV.of("2", "Tom_UK"));
+    data.add(KV.of("3", "Nick_UAE"));
+    data.add(KV.of("4", "Smith_IND"));
+    data.add(KV.of("5", "Taylor_US"));
+    data.add(KV.of("6", "Gray_UK"));
+    data.add(KV.of("7", "James_UAE"));
+    data.add(KV.of("8", "Jordan_IND"));
+    data.add(KV.of("9", "Leena_UK"));
+    data.add(KV.of("10", "Zara_UAE"));
+    data.add(KV.of("11", "Talia_IND"));
+    data.add(KV.of("12", "Rose_UK"));
+    data.add(KV.of("13", "Kelvin_UAE"));
+    data.add(KV.of("14", "Goerge_IND"));
+    return data;
+  }
+
+  /**
+   * This is a helper function used in unit tests for validating data against data read using
+   * {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat}.
+   */
+  public static List<KV<Text, Employee>> getEmployeeData() {
+    return Lists.transform((data.isEmpty() ? populateEmployeeData() : data),
+        new Function<KV<String, String>, KV<Text, Employee>>() {
+          @Override
+          public KV<Text, Employee> apply(KV<String, String> input) {
+            String[] empData = input.getValue().split("_");
+            return KV.of(new Text(input.getKey()), new Employee(empData[0], empData[1]));
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
new file mode 100644
index 0000000..4c510ae
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -0,0 +1,278 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!-- The HifIO tests for Cassandra and Elasticsearch both work only with
+  jdk1.8, but Beam's enforcer rules require jdk1.7 and jdk1.8 support. This
+  child module contains only those tests and overrides the enforcer rules to
+  allow 1.8 only behavior without making all of HifIO work only with jdk1.8. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: jdk1.8-tests</name>
+  <description>Integration tests and junits which need JDK1.8.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+       <!-- Guava shading is required as Cassandra tests require version
+       19 of Guava, by default project wide Guava shading may not suffice as it
+       loads a different version of guava which will not work for Cassandra tests -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava:19.0</include>
+                </includes>
+              </artifactSet>
+              <relocations>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.common</shadedPattern>
+                </relocation>
+               <relocation>
+                 <pattern>com.google.thirdparty</pattern>
+                 <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.thirdparty</shadedPattern>
+                 </relocation>
+               </relocations>
+               <transformers>
+                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+               </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Overridden enforcer plugin for JDK1.8 for running tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <enforceBytecodeVersion>
+                  <maxJdkVersion>1.8</maxJdkVersion>
+                  <excludes>
+                    <!-- Supplied by the user JDK and compiled with matching
+                      version. Is not shaded, so safe to ignore. -->
+                    <exclude>jdk.tools:jdk.tools</exclude>
+                  </excludes>
+                </enforceBytecodeVersion>
+                <requireJavaVersion>
+                  <version>[1.8,)</version>
+                </requireJavaVersion>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+        <dependencies>
+          <dependency>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>extra-enforcer-rules</artifactId>
+            <version>1.0-beta-6</version>
+          </dependency>
+        </dependencies>
+      </plugin>
+    </plugins>
+  </build>
+  <!--The dataflow-runner and spark-runner profiles support using those runners
+    during an integration test. These are not the long-term way we want to support
+    using runners in ITs (e.g. it is annoying to add to all IO modules.) We cannot
+    create a dependency IO -> Runners since the runners depend on IO (e.g. kafka
+    depends on spark.) -->
+
+  <profiles>
+    <!-- Include the Apache Spark runner -P spark-runner -->
+    <profile>
+      <id>spark-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-spark</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.slf4j</groupId>
+              <artifactId>jul-to-slf4j</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
+    <profile>
+      <id>dataflow-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <properties>
+    <log4j.core.version>2.6.2</log4j.core.version>
+    <hadoop.common.version>2.7.0</hadoop.common.version>
+    <guava.version>19.0</guava.version>
+    <transport.netty4.client.version>5.0.0</transport.netty4.client.version>
+    <netty.transport.native.epoll.version>4.1.0.CR3</netty.transport.native.epoll.version>
+    <elasticsearch.version>5.0.0</elasticsearch.version>
+    <cassandra.driver.mapping.version>3.1.1</cassandra.driver.mapping.version>
+    <cassandra.all.verison>3.9</cassandra.all.verison>
+    <cassandra.driver.core.version>3.1.1</cassandra.driver.core.version>
+    <commons.io.version>2.4</commons.io.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.common.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.common.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>2.6.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.plugin</groupId>
+      <artifactId>transport-netty4-client</artifactId>
+      <version>${transport.netty4.client.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>transport</artifactId>
+      <version>${elasticsearch.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <version>${netty.transport.native.epoll.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>${elasticsearch.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch-hadoop</artifactId>
+      <version>${elasticsearch.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-mapping</artifactId>
+      <version>${cassandra.driver.mapping.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cassandra</groupId>
+      <artifactId>cassandra-all</artifactId>
+      <version>${cassandra.all.verison}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+      <version>${cassandra.driver.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- runtime dependencies -->
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons.io.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
new file mode 100644
index 0000000..599a4a1
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
+import org.elasticsearch.hadoop.mr.EsInputFormat;
+import org.elasticsearch.hadoop.mr.LinkedMapWritable;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeValidationException;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests to validate HadoopInputFormatIO for embedded Elasticsearch instance.
+ *
+ * {@link EsInputFormat} can be used to read data from Elasticsearch. EsInputFormat by default
+ * returns key class as Text and value class as LinkedMapWritable. You can also set MapWritable as
+ * value class, provided that you set the property "mapred.mapoutput.value.class" with
+ * MapWritable.class. If this property is not set then, using MapWritable as value class may give
+ * org.apache.beam.sdk.coders.CoderException due to unexpected extra bytes after decoding.
+ */
+
+@RunWith(JUnit4.class)
+public class HIFIOWithElasticTest implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOGGER = LoggerFactory.getLogger(HIFIOWithElasticTest.class);
+  private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
+  private static final String ELASTIC_IN_MEM_PORT = "9200";
+  private static final String ELASTIC_INTERNAL_VERSION = "5.x";
+  private static final String TRUE = "true";
+  private static final String ELASTIC_INDEX_NAME = "beamdb";
+  private static final String ELASTIC_TYPE_NAME = "scientists";
+  private static final String ELASTIC_RESOURCE = "/" + ELASTIC_INDEX_NAME + "/" + ELASTIC_TYPE_NAME;
+  private static final int TEST_DATA_ROW_COUNT = 10;
+  private static final String ELASTIC_TYPE_ID_PREFIX = "s";
+
+  @ClassRule
+  public static TemporaryFolder elasticTempFolder = new TemporaryFolder();
+
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void startServer()
+      throws NodeValidationException, InterruptedException, IOException {
+    ElasticEmbeddedServer.startElasticEmbeddedServer();
+  }
+
+  /**
+   * Test to read data from embedded Elasticsearch instance and verify whether data is read
+   * successfully.
+   */
+  @Test
+  public void testHifIOWithElastic() {
+    // Expected hashcode is evaluated during insertion time one time and hardcoded here.
+    String expectedHashCode = "e2098f431f90193aa4545e033e6fd2217aafe7b6";
+    Configuration conf = getConfiguration();
+    PCollection<KV<Text, LinkedMapWritable>> esData =
+        pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
+    PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
+    // Verify that the count of objects fetched using HIFInputFormat IO is correct.
+    PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
+    PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
+    PCollection<String> textValues = values.apply(transformFunc);
+    // Verify the output values using checksum comparison.
+    PCollection<String> consolidatedHashcode =
+        textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+    PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
+    pipeline.run().waitUntilFinish();
+  }
+
+  MapElements<LinkedMapWritable, String> transformFunc =
+      MapElements.<LinkedMapWritable, String>via(new SimpleFunction<LinkedMapWritable, String>() {
+        @Override
+        public String apply(LinkedMapWritable mapw) {
+          return mapw.get(new Text("id")) + "|" + mapw.get(new Text("scientist"));
+        }
+      });
+  /**
+   * Test to read data from embedded Elasticsearch instance based on query and verify whether data
+   * is read successfully.
+   */
+  @Test
+  public void testHifIOWithElasticQuery() {
+    long expectedRowCount = 1L;
+    String expectedHashCode = "caa37dbd8258e3a7f98932958c819a57aab044ec";
+    Configuration conf = getConfiguration();
+    String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2";
+    String query = "{"
+                  + "  \"query\": {"
+                  + "  \"match\" : {"
+                  + "    \"id\" : {"
+                  + "      \"query\" : \"" + fieldValue + "\","
+                  + "      \"type\" : \"boolean\""
+                  + "    }"
+                  + "  }"
+                  + "  }"
+                  + "}";
+    conf.set(ConfigurationOptions.ES_QUERY, query);
+    PCollection<KV<Text, LinkedMapWritable>> esData =
+        pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
+    PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
+    // Verify that the count of objects fetched using HIFInputFormat IO is correct.
+    PAssert.thatSingleton(count).isEqualTo(expectedRowCount);
+    PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
+    PCollection<String> textValues = values.apply(transformFunc);
+    // Verify the output values using checksum comparison.
+    PCollection<String> consolidatedHashcode =
+        textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
+    PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Set the Elasticsearch configuration parameters in the Hadoop configuration object.
+   * Configuration object should have InputFormat class, key class and value class set. Mandatory
+   * fields for ESInputFormat to be set are es.resource, es.nodes, es.port, es.internal.es.version.
+   * Please refer to
+   * <a href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html"
+   * >Elasticsearch Configuration</a> for more details.
+   */
+  public Configuration getConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME);
+    conf.set(ConfigurationOptions.ES_PORT, String.format("%s", ELASTIC_IN_MEM_PORT));
+    conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE);
+    conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
+    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE);
+    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, TRUE);
+    conf.setClass("mapreduce.job.inputformat.class",
+        org.elasticsearch.hadoop.mr.EsInputFormat.class, InputFormat.class);
+    conf.setClass("key.class", Text.class, Object.class);
+    conf.setClass("value.class", LinkedMapWritable.class, Object.class);
+    return conf;
+ }
+
+  private static Map<String, String> createElasticRow(String id, String name) {
+    Map<String, String> data = new HashMap<String, String>();
+    data.put("id", id);
+    data.put("scientist", name);
+    return data;
+  }
+
+  @AfterClass
+  public static void shutdownServer() throws IOException {
+    ElasticEmbeddedServer.shutdown();
+  }
+
+  /**
+   * Class for in memory Elasticsearch server.
+   */
+  static class ElasticEmbeddedServer implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static Node node;
+
+    public static void startElasticEmbeddedServer()
+        throws NodeValidationException, InterruptedException {
+      Settings settings = Settings.builder()
+          .put("node.data", TRUE)
+          .put("network.host", ELASTIC_IN_MEM_HOSTNAME)
+          .put("http.port", ELASTIC_IN_MEM_PORT)
+          .put("path.data", elasticTempFolder.getRoot().getPath())
+          .put("path.home", elasticTempFolder.getRoot().getPath())
+          .put("transport.type", "local")
+          .put("http.enabled", TRUE)
+          .put("node.ingest", TRUE).build();
+      node = new PluginNode(settings);
+      node.start();
+      LOGGER.info("Elastic in memory server started.");
+      prepareElasticIndex();
+      LOGGER.info("Prepared index " + ELASTIC_INDEX_NAME
+          + "and populated data on elastic in memory server.");
+    }
+
+    /**
+     * Prepares Elastic index, by adding rows.
+     */
+    private static void prepareElasticIndex() throws InterruptedException {
+      CreateIndexRequest indexRequest = new CreateIndexRequest(ELASTIC_INDEX_NAME);
+      node.client().admin().indices().create(indexRequest).actionGet();
+      for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
+        node.client().prepareIndex(ELASTIC_INDEX_NAME, ELASTIC_TYPE_NAME, String.valueOf(i))
+            .setSource(createElasticRow(ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i)).execute()
+            .actionGet();
+      }
+      node.client().admin().indices().prepareRefresh(ELASTIC_INDEX_NAME).get();
+    }
+    /**
+     * Shutdown the embedded instance.
+     * @throws IOException
+     */
+    public static void shutdown() throws IOException {
+      DeleteIndexRequest indexRequest = new DeleteIndexRequest(ELASTIC_INDEX_NAME);
+      node.client().admin().indices().delete(indexRequest).actionGet();
+      LOGGER.info("Deleted index " + ELASTIC_INDEX_NAME + " from elastic in memory server");
+      node.close();
+      LOGGER.info("Closed elastic in memory server node.");
+      deleteElasticDataDirectory();
+    }
+
+    private static void deleteElasticDataDirectory() {
+      try {
+        FileUtils.deleteDirectory(new File(elasticTempFolder.getRoot().getPath()));
+      } catch (IOException e) {
+        throw new RuntimeException("Could not delete elastic data directory: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Class created for handling "http.enabled" property as "true" for Elasticsearch node.
+   */
+  static class PluginNode extends Node implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    static Collection<Class<? extends Plugin>> list = new ArrayList<Class<? extends Plugin>>();
+    static {
+      list.add(Netty4Plugin.class);
+    }
+
+    public PluginNode(final Settings settings) {
+      super(InternalSettingsPreparer.prepareEnvironment(settings, null), list);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java
new file mode 100644
index 0000000..2e89ed1
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat.custom.options;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * Properties needed when using HadoopInputFormatIO with the Beam SDK.
+ */
+public interface HIFTestOptions extends TestPipelineOptions {
+
+  //Cassandra test options
+  @Description("Cassandra Server IP")
+  @Default.String("cassandraServerIp")
+  String getCassandraServerIp();
+  void setCassandraServerIp(String cassandraServerIp);
+  @Description("Cassandra Server port")
+  @Default.Integer(0)
+  Integer getCassandraServerPort();
+  void setCassandraServerPort(Integer cassandraServerPort);
+  @Description("Cassandra User name")
+  @Default.String("cassandraUserName")
+  String getCassandraUserName();
+  void setCassandraUserName(String cassandraUserName);
+  @Description("Cassandra Password")
+  @Default.String("cassandraPassword")
+  String getCassandraPassword();
+  void setCassandraPassword(String cassandraPassword);
+
+  //Elasticsearch test options
+  @Description("Elasticsearch Server IP")
+  @Default.String("elasticServerIp")
+  String getElasticServerIp();
+  void setElasticServerIp(String elasticServerIp);
+  @Description("Elasticsearch Server port")
+  @Default.Integer(0)
+  Integer getElasticServerPort();
+  void setElasticServerPort(Integer elasticServerPort);
+  @Description("Elasticsearch User name")
+  @Default.String("elasticUserName")
+  String getElasticUserName();
+  void setElasticUserName(String elasticUserName);
+  @Description("Elastic Password")
+  @Default.String("elasticPassword")
+  String getElasticPassword();
+  void setElasticPassword(String elasticPassword);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
new file mode 100644
index 0000000..fe37048
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat.hashing;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a
+ * HashCode.
+ */
+public class HashingFn extends CombineFn<String, HashingFn.Accum, String> {
+
+  /**
+   * Serializable Class to store the HashCode of input String.
+   */
+  public static class Accum implements Serializable {
+    HashCode hashCode = null;
+
+    public Accum(HashCode value) {
+      this.hashCode = value;
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+      out.defaultWriteObject();
+    }
+  }
+
+  @Override
+  public Accum addInput(Accum accum, String input) {
+    List<HashCode> elementHashes = Lists.newArrayList();
+     if (accum.hashCode != null) {
+      elementHashes.add(accum.hashCode);
+    }
+    HashCode inputHashCode = Hashing.sha1().hashString(input, StandardCharsets.UTF_8);
+    elementHashes.add(inputHashCode);
+    accum.hashCode = Hashing.combineUnordered(elementHashes);
+    return accum;
+  }
+
+  @Override
+  public Accum mergeAccumulators(Iterable<Accum> accums) {
+    Accum merged = createAccumulator();
+    List<HashCode> elementHashes = Lists.newArrayList();
+    for (Accum accum : accums) {
+      if (accum.hashCode != null) {
+        elementHashes.add(accum.hashCode);
+      }
+    }
+    merged.hashCode = Hashing.combineUnordered(elementHashes);
+    return merged;
+  }
+
+  @Override
+  public String extractOutput(Accum accum) {
+    // Return the combined hash code of list of elements in the Pcollection.
+    String consolidatedHash = "";
+    if (accum.hashCode != null) {
+      consolidatedHash = accum.hashCode.toString();
+    }
+    return consolidatedHash;
+  }
+
+  @Override
+  public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder)
+      throws CannotProvideCoderException {
+    return SerializableCoder.of(Accum.class);
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) {
+    return inputCoder;
+  }
+
+  @Override
+  public Accum createAccumulator() {
+    return new Accum(null);
+  }
+}