You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/04/06 14:33:24 UTC

[5/7] beam git commit: HadoopInputFormatIO with junits

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
deleted file mode 100644
index c25cf51..0000000
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.beam.sdk.io.hadoop.inputformat;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader;
-import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Unit tests for {@link HadoopInputFormatIO}.
- */
-@RunWith(JUnit4.class)
-public class HadoopInputFormatIOTest {
-  static SerializableConfiguration serConf;
-  static SimpleFunction<Text, String> myKeyTranslate;
-  static SimpleFunction<Employee, String> myValueTranslate;
-
-  @Rule public final transient TestPipeline p = TestPipeline.create();
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private PBegin input = PBegin.in(p);
-
-  @BeforeClass
-  public static void setUp() throws IOException, InterruptedException {
-    serConf = loadTestConfiguration(
-                  EmployeeInputFormat.class,
-                  Text.class,
-                  Employee.class);
-    myKeyTranslate = new SimpleFunction<Text, String>() {
-      @Override
-      public String apply(Text input) {
-        return input.toString();
-      }
-    };
-    myValueTranslate = new SimpleFunction<Employee, String>() {
-      @Override
-      public String apply(Employee input) {
-        return input.getEmpName() + "_" + input.getEmpAddress();
-      }
-    };
-  }
-
-  @Test
-  public void testReadBuildsCorrectly() {
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate)
-        .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
-    assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
-    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} builds correctly in different order
-   * of with configuration/key translation/value translation. This test also validates output
-   * PCollection key/value classes are set correctly even if Hadoop configuration is set after
-   * setting key/value translation.
-   */
-  @Test
-  public void testReadBuildsCorrectlyInDifferentOrder() {
-    HadoopInputFormatIO.Read<String, String> read =
-        HadoopInputFormatIO.<String, String>read()
-            .withValueTranslation(myValueTranslate)
-            .withConfiguration(serConf.getHadoopConfiguration())
-            .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
-    assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
-    assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} object creation if
-   * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()} is called more than
-   * once.
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  @Test
-  public void testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime()
-      throws IOException, InterruptedException {
-    SerializableConfiguration diffConf =
-        loadTestConfiguration(
-            EmployeeInputFormat.class,
-            Employee.class,
-            Text.class);
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate)
-        .withConfiguration(diffConf.getHadoopConfiguration());
-    assertEquals(diffConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
-    assertEquals(null, read.getValueTranslationFunction());
-    assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
-    assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class), read
-        .getValueTypeDescriptor().getRawType());
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData()
-   * populateDisplayData()}.
-   */
-  @Test
-  public void testReadDisplayData() {
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate)
-        .withValueTranslation(myValueTranslate);
-    DisplayData displayData = DisplayData.from(read);
-    Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator();
-    while (propertyElement.hasNext()) {
-      Entry<String, String> element = propertyElement.next();
-      assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue()));
-    }
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
-   * null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}
-   * method checks configuration is null and throws exception if it is null.
-   */
-  @Test
-  public void testReadObjectCreationFailsIfConfigurationIsNull() {
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<Text, Employee>read()
-          .withConfiguration(null);
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with only
-   * configuration.
-   */
-  @Test
-  public void testReadObjectCreationWithConfiguration() {
-    HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(null, read.getKeyTranslationFunction());
-    assertEquals(null, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read
-        .getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), read
-        .getValueTypeDescriptor().getRawType());
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
-   * configuration and null key translation. {@link HadoopInputFormatIO.Read#withKeyTranslation()
-   * withKeyTranslation()} checks keyTranslation is null and throws exception if it null value is
-   * passed.
-   */
-  @Test
-  public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(null);
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
-   * configuration and key translation.
-   */
-  @Test
-  public void testReadObjectCreationWithConfigurationKeyTranslation() {
-    HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
-    assertEquals(null, read.getValueTranslationFunction());
-    assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
-        read.getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
-        read.getValueTypeDescriptor().getRawType());
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with
-   * configuration and null value translation.
-   * {@link HadoopInputFormatIO.Read#withValueTranslation() withValueTranslation()} checks
-   * valueTranslation is null and throws exception if null value is passed.
-   */
-  @Test
-  public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withValueTranslation(null);
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
-   * configuration and value translation.
-   */
-  @Test
-  public void testReadObjectCreationWithConfigurationValueTranslation() {
-    HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(null, read.getKeyTranslationFunction());
-    assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class),
-        read.getKeyTypeDescriptor().getRawType());
-    assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
-        read.getValueTypeDescriptor().getRawType());
-  }
-
-  /**
-   * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with
-   * configuration, key translation and value translation.
-   */
-  @Test
-  public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() {
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslate)
-        .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
-    assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
-    assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
-        read.getKeyTypeDescriptor().getRawType());
-    assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
-        read.getValueTypeDescriptor().getRawType());
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when Read transform is created without calling
-   * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}.
-   */
-  @Test
-  public void testReadValidationFailsMissingConfiguration() {
-    HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read();
-    thrown.expect(NullPointerException.class);
-    read.validate(input);
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
-   * withConfiguration()} function when Hadoop InputFormat class is not provided by the user in
-   * configuration.
-   */
-  @Test
-  public void testReadValidationFailsMissingInputFormatInConf() {
-    Configuration configuration = new Configuration();
-    configuration.setClass("key.class", Text.class, Object.class);
-    configuration.setClass("value.class", Employee.class, Object.class);
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(configuration);
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
-   * withConfiguration()} function when key class is not provided by the user in configuration.
-   */
-  @Test
-  public void testReadValidationFailsMissingKeyClassInConf() {
-    Configuration configuration = new Configuration();
-    configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
-        InputFormat.class);
-    configuration.setClass("value.class", Employee.class, Object.class);
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(configuration);
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration()
-   * withConfiguration()} function when value class is not provided by the user in configuration.
-   */
-  @Test
-  public void testReadValidationFailsMissingValueClassInConf() {
-    Configuration configuration = new Configuration();
-    configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
-        InputFormat.class);
-    configuration.setClass("key.class", Text.class, Object.class);
-    thrown.expect(NullPointerException.class);
-    HadoopInputFormatIO.<Text, Employee>read().withConfiguration(configuration);
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myKeyTranslate's (simple function provided by user for key
-   * translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set in
-   * configuration as "key.class").
-   */
-  @Test
-  public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
-    SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType =
-        new SimpleFunction<LongWritable, String>() {
-          @Override
-          public String apply(LongWritable input) {
-            return input.toString();
-          }
-        };
-    HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
-        .withKeyTranslation(myKeyTranslateWithWrongInputType);
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(String.format(
-        "Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s",
-        serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
-            InputFormat.class), serConf.getHadoopConfiguration()
-            .getClass("key.class", Object.class)));
-    read.validate(input);
-  }
-
-  /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myValueTranslate's (simple function provided by user for value
-   * translation) input type is not same as Hadoop InputFormat's valueClass(Which is property set in
-   * configuration as "value.class").
-   */
-  @Test
-  public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() {
-    SimpleFunction<LongWritable, String> myValueTranslateWithWrongInputType =
-        new SimpleFunction<LongWritable, String>() {
-          @Override
-          public String apply(LongWritable input) {
-            return input.toString();
-          }
-        };
-    HadoopInputFormatIO.Read<Text, String> read =
-        HadoopInputFormatIO.<Text, String>read()
-            .withConfiguration(serConf.getHadoopConfiguration())
-            .withValueTranslation(myValueTranslateWithWrongInputType);
-    String expectedMessage =
-        String.format(
-            "Value translation's input type is not same as hadoop InputFormat :  "
-                + "%s value class : %s",
-            serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
-                InputFormat.class),
-            serConf.getHadoopConfiguration().getClass("value.class", Object.class));
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(expectedMessage);
-    read.validate(input);
-  }
-
-  /**
-   * This test validates reading from Hadoop InputFormat if wrong key class is set in
-   * configuration.
-   */
-  @Test
-  public void testReadFailsWithWrongKeyClass() {
-    SerializableConfiguration wrongConf = loadTestConfiguration(
-       EmployeeInputFormat.class,
-       MapWritable.class, // Actual key class is Text.class.
-       Employee.class);
-    HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(wrongConf.getHadoopConfiguration());
-    String expectedMessage =
-        String.format("java.lang.IllegalArgumentException: " + "Wrong InputFormat key class in "
-            + "configuration : Expected key.class is %s but was %s.", Text.class.getName(),
-            MapWritable.class.getName());
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectMessage(expectedMessage);
-    p.apply("ReadTest", read);
-    p.run();
-  }
-
-  /**
-   * This test validates reading from Hadoop InputFormat if wrong value class is set in
-   * configuration.
-   */
-  @Test
-  public void testReadFailsWithWrongValueClass() {
-    SerializableConfiguration wrongConf = loadTestConfiguration(
-       EmployeeInputFormat.class,
-       Text.class,
-       MapWritable.class); // Actual value class is Employee.class.
-    HadoopInputFormatIO.Read<Text, MapWritable> read = HadoopInputFormatIO.<Text, MapWritable>read()
-        .withConfiguration(wrongConf.getHadoopConfiguration());
-    String expectedMessage =
-        String.format("java.lang.IllegalArgumentException: "
-            + "Wrong InputFormat value class in configuration : "
-            + "Expected value.class is %s but was %s.", Employee.class.getName(),
-            MapWritable.class.getName());
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectMessage(expectedMessage);
-    p.apply("ReadTest", read);
-    p.run();
-  }
-
-  @Test
-  public void testReadingData() throws Exception {
-    HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
-    List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
-    PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read);
-    PAssert.that(actual).containsInAnyOrder(expected);
-    p.run();
-  }
-
-  /**
-   * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object
-   * creation fails.
-   */
-  @Test
-  public void testReadIfCreateRecordReaderFails() throws Exception {
-    thrown.expect(Exception.class);
-    thrown.expectMessage("Exception in creating RecordReader");
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    Mockito.when(
-        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
-            Mockito.any(TaskAttemptContext.class))).thenThrow(
-        new IOException("Exception in creating RecordReader"));
-    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            new SerializableSplit());
-    boundedSource.setInputFormatObj(mockInputFormat);
-    SourceTestUtils.readFromSource(boundedSource, p.getOptions());
-  }
-
-  /**
-   * This test validates behavior of HadoopInputFormatSource if
-   * {@link InputFormat#createRecordReader() createRecordReader()} of InputFormat returns null.
-   */
-  @Test
-  public void testReadWithNullCreateRecordReader() throws Exception {
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    thrown.expect(IOException.class);
-    thrown.expectMessage(String.format("Null RecordReader object returned by %s",
-            mockInputFormat.getClass()));
-    Mockito.when(
-        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
-            Mockito.any(TaskAttemptContext.class))).thenReturn(null);
-    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            new SerializableSplit());
-    boundedSource.setInputFormatObj(mockInputFormat);
-    SourceTestUtils.readFromSource(boundedSource, p.getOptions());
-  }
-
-  /**
-   * This test validates behavior of
-   * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#start() start()} method if
-   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplitList having zero
-   * records.
-   */
-  @Test
-  public void testReadersStartWhenZeroRecords() throws Exception {
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class);
-    Mockito.when(
-        mockInputFormat.createRecordReader(Mockito.any(InputSplit.class),
-            Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader);
-    Mockito.when(mockReader.nextKeyValue()).thenReturn(false);
-    InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
-    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            new SerializableSplit(mockInputSplit));
-    BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions());
-    assertEquals(false, boundedReader.start());
-    assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed());
-  }
-
-  /**
-   * This test validates the method getFractionConsumed()- which indicates the progress of the read
-   * in range of 0 to 1.
-   */
-  @Test
-  public void testReadersGetFractionConsumed() throws Exception {
-    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
-    HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
-        EmployeeInputFormat.class,
-        Text.class,
-        Employee.class,
-        WritableCoder.of(Text.class),
-        AvroCoder.of(Employee.class));
-    long estimatedSize = hifSource.getEstimatedSizeBytes(p.getOptions());
-    // Validate if estimated size is equal to the size of records.
-    assertEquals(referenceRecords.size(), estimatedSize);
-    List<BoundedSource<KV<Text, Employee>>> boundedSourceList =
-        hifSource.splitIntoBundles(0, p.getOptions());
-    // Validate if splitIntoBundles() has split correctly.
-    assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size());
-    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
-    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
-      List<KV<Text, Employee>> elements = new ArrayList<KV<Text, Employee>>();
-      BoundedReader<KV<Text, Employee>> reader = source.createReader(p.getOptions());
-      float recordsRead = 0;
-      // When start is not called, getFractionConsumed() should return 0.
-      assertEquals(Double.valueOf(0), reader.getFractionConsumed());
-      boolean start = reader.start();
-      assertEquals(true, start);
-      if (start) {
-        elements.add(reader.getCurrent());
-        boolean advance = reader.advance();
-        // Validate if getFractionConsumed() returns the correct fraction based on
-        // the number of records read in the split.
-        assertEquals(
-            Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
-            reader.getFractionConsumed());
-        assertEquals(true, advance);
-        while (advance) {
-          elements.add(reader.getCurrent());
-          advance = reader.advance();
-          assertEquals(
-              Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
-              reader.getFractionConsumed());
-        }
-        bundleRecords.addAll(elements);
-      }
-      // Validate if getFractionConsumed() returns 1 after reading is complete.
-      assertEquals(Double.valueOf(1), reader.getFractionConsumed());
-      reader.close();
-    }
-    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
-  }
-
-  /**
-   * This test validates that reader and its parent source reads the same records.
-   */
-  @Test
-  public void testReaderAndParentSourceReadsSameData() throws Exception {
-    InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class);
-    HadoopInputFormatBoundedSource<Text, Employee> boundedSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            new SerializableSplit(mockInputSplit));
-    BoundedReader<KV<Text, Employee>> reader = boundedSource
-        .createReader(p.getOptions());
-    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(reader, p.getOptions());
-  }
-
-  /**
-   * This test verifies that the method
-   * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#getCurrentSource()
-   * getCurrentSource()} returns correct source object.
-   */
-  @Test
-  public void testGetCurrentSourceFunction() throws Exception {
-    SerializableSplit split = new SerializableSplit();
-    BoundedSource<KV<Text, Employee>> source =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            split);
-    BoundedReader<KV<Text, Employee>> hifReader = source.createReader(p.getOptions());
-    BoundedSource<KV<Text, Employee>> hifSource = hifReader.getCurrentSource();
-    assertEquals(hifSource, source);
-  }
-
-  /**
-   * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader()
-   * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles()
-   * splitIntoBundles()} is not called.
-   */
-  @Test
-  public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception {
-    HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
-        EmployeeInputFormat.class,
-        Text.class,
-        Employee.class,
-        WritableCoder.of(Text.class),
-        AvroCoder.of(Employee.class));
-    thrown.expect(IOException.class);
-    thrown.expectMessage("Cannot create reader as source is not split yet.");
-    hifSource.createReader(p.getOptions());
-  }
-
-  /**
-   * This test validates behavior of
-   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
-   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list.
-   */
-  @Test
-  public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception {
-    InputFormat<?, ?> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
-    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(
-        new ArrayList<InputSplit>());
-    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            mockInputSplit);
-    thrown.expect(IOException.class);
-    thrown.expectMessage("Error in computing splits, getSplits() returns a empty list");
-    hifSource.setInputFormatObj(mockInputFormat);
-    hifSource.computeSplitsIfNecessary();
-  }
-
-  /**
-   * This test validates behavior of
-   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
-   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns NULL value.
-   */
-  @Test
-  public void testComputeSplitsIfGetSplitsReturnsNullValue() throws Exception {
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
-    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(null);
-    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            mockInputSplit);
-    thrown.expect(IOException.class);
-    thrown.expectMessage("Error in computing splits, getSplits() returns null.");
-    hifSource.setInputFormatObj(mockInputFormat);
-    hifSource.computeSplitsIfNecessary();
-  }
-
-  /**
-   * This test validates behavior of
-   * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} if Hadoop
-   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplit list having some
-   * null values.
-   */
-  @Test
-  public void testComputeSplitsIfGetSplitsReturnsListHavingNullValues() throws Exception {
-    // InputSplit list having null value.
-    InputSplit mockInputSplit =
-        Mockito.mock(InputSplit.class, Mockito.withSettings().extraInterfaces(Writable.class));
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    inputSplitList.add(mockInputSplit);
-    inputSplitList.add(null);
-    InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
-    Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(
-        inputSplitList);
-    HadoopInputFormatBoundedSource<Text, Employee> hifSource =
-        new HadoopInputFormatBoundedSource<Text, Employee>(
-            serConf,
-            WritableCoder.of(Text.class),
-            AvroCoder.of(Employee.class),
-            null, // No key translation required.
-            null, // No value translation required.
-            new SerializableSplit());
-    thrown.expect(IOException.class);
-    thrown.expectMessage("Error in computing splits, split is null in InputSplits list populated "
-        + "by getSplits() : ");
-    hifSource.setInputFormatObj(mockInputFormat);
-    hifSource.computeSplitsIfNecessary();
-  }
-
-  /**
-   * This test validates records emitted in PCollection are immutable if InputFormat's recordReader
-   * returns same objects(i.e. same locations in memory) but with updated values for each record.
-   */
-  @Test
-  public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreMutable() throws Exception {
-    List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
-       ReuseObjectsEmployeeInputFormat.class,
-       Text.class,
-       Employee.class,
-       WritableCoder.of(Text.class),
-       AvroCoder.of(Employee.class));
-    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
-    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
-      List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions());
-      bundleRecords.addAll(elems);
-    }
-    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
-    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
-  }
-
-  /**
-   * Test reading if InputFormat implements {@link org.apache.hadoop.conf.Configurable
-   * Configurable}.
-   */
-  @Test
-  public void testReadingWithConfigurableInputFormat() throws Exception {
-    List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
-        ConfigurableEmployeeInputFormat.class,
-        Text.class,
-        Employee.class,
-        WritableCoder.of(Text.class),
-        AvroCoder.of(Employee.class));
-    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
-      // Cast to HadoopInputFormatBoundedSource to access getInputFormat().
-      @SuppressWarnings("unchecked")
-      HadoopInputFormatBoundedSource<Text, Employee> hifSource =
-          (HadoopInputFormatBoundedSource<Text, Employee>) source;
-      hifSource.createInputFormatInstance();
-      ConfigurableEmployeeInputFormat inputFormatObj =
-          (ConfigurableEmployeeInputFormat) hifSource.getInputFormat();
-      assertEquals(true, inputFormatObj.isConfSet);
-    }
-  }
-
-  /**
-   * This test validates records emitted in PCollection are immutable if InputFormat's
-   * {@link org.apache.hadoop.mapreduce.RecordReader RecordReader} returns different objects (i.e.
-   * different locations in memory).
-   */
-  @Test
-  public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreImmutable() throws Exception {
-   List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList(
-       EmployeeInputFormat.class,
-       Text.class,
-       Employee.class,
-       WritableCoder.of(Text.class),
-       AvroCoder.of(Employee.class));
-    List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
-    for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
-      List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions());
-      bundleRecords.addAll(elems);
-    }
-    List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData();
-    assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
-  }
-
-  private static SerializableConfiguration loadTestConfiguration(Class<?> inputFormatClassName,
-      Class<?> keyClass, Class<?> valueClass) {
-    Configuration conf = new Configuration();
-    conf.setClass("mapreduce.job.inputformat.class", inputFormatClassName, InputFormat.class);
-    conf.setClass("key.class", keyClass, Object.class);
-    conf.setClass("value.class", valueClass, Object.class);
-    return new SerializableConfiguration(conf);
-  }
-
-  private <K, V> HadoopInputFormatBoundedSource<K, V> getTestHIFSource(
-      Class<?> inputFormatClass,
-      Class<K> inputFormatKeyClass,
-      Class<V> inputFormatValueClass,
-      Coder<K> keyCoder,
-      Coder<V> valueCoder){
-    SerializableConfiguration serConf =
-        loadTestConfiguration(
-            inputFormatClass,
-            inputFormatKeyClass,
-            inputFormatValueClass);
-    return new HadoopInputFormatBoundedSource<K, V>(
-            serConf,
-            keyCoder,
-            valueCoder,
-            null, // No key translation required.
-            null); // No value translation required.
-  }
-
-  private <K, V> List<BoundedSource<KV<K, V>>> getBoundedSourceList(
-      Class<?> inputFormatClass,
-      Class<K> inputFormatKeyClass,
-      Class<V> inputFormatValueClass,
-      Coder<K> keyCoder,
-      Coder<V> valueCoder) throws Exception{
-    HadoopInputFormatBoundedSource<K, V> boundedSource = getTestHIFSource(
-        inputFormatClass,
-        inputFormatKeyClass,
-        inputFormatValueClass,
-        keyCoder,
-        valueCoder);
-    return boundedSource.splitIntoBundles(0, p.getOptions());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
deleted file mode 100644
index fbe74ec..0000000
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.beam.sdk.io.hadoop.inputformat;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * This is a valid InputFormat for reading employee data which is available in the form of
- * {@code List<KV>} as {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList
- * employeeDataList}. {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList
- * employeeDataList} is populated using {@linkplain TestEmployeeDataSet#populateEmployeeDataNew()}.
- *
- * <p>{@linkplain ReuseObjectsEmployeeInputFormat} splits data into
- * {@value TestEmployeeDataSet#NUMBER_OF_SPLITS} splits, each split having
- * {@value TestEmployeeDataSet#NUMBER_OF_RECORDS_IN_EACH_SPLIT} records each.
- * {@linkplain ReuseObjectsEmployeeInputFormat} reads data from
- * {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList} and produces a
- * key (employee id) of type Text and value of type {@linkplain Employee Employee}.
- *
- * <p>{@linkplain ReuseObjectsEmployeeInputFormat} is also input to test whether
- * {@linkplain HadoopInputFormatIO } source returns immutable records for a scenario when
- * RecordReader returns the same key and value objects with updating values every time it reads
- * data.
- */
-public class ReuseObjectsEmployeeInputFormat extends InputFormat<Text, Employee> {
-
-  public ReuseObjectsEmployeeInputFormat() {}
-
-  @Override
-  public RecordReader<Text, Employee> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new ReuseObjectsEmployeeRecordReader();
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException {
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) {
-      InputSplit inputSplitObj = new ReuseEmployeeInputSplit(
-          ((i - 1) * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT),
-          (i * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT - 1));
-      inputSplitList.add(inputSplitObj);
-    }
-    return inputSplitList;
-  }
-
-  /**
-   * InputSplit implementation for ReuseObjectsEmployeeInputFormat.
-   */
-  public class ReuseEmployeeInputSplit extends InputSplit implements Writable {
-    // Start and end map index of each split of employeeData.
-    private long startIndex;
-    private long endIndex;
-
-    public ReuseEmployeeInputSplit() {}
-
-    public ReuseEmployeeInputSplit(long startIndex, long endIndex) {
-      this.startIndex = startIndex;
-      this.endIndex = endIndex;
-    }
-
-    /** Returns number of records in each split. */
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-      return this.endIndex - this.startIndex + 1;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      return null;
-    }
-
-
-    public long getStartIndex() {
-      return startIndex;
-    }
-
-    public long getEndIndex() {
-      return endIndex;
-    }
-
-    @Override
-    public void readFields(DataInput dataIn) throws IOException {
-      startIndex = dataIn.readLong();
-      endIndex = dataIn.readLong();
-    }
-
-    @Override
-    public void write(DataOutput dataOut) throws IOException {
-      dataOut.writeLong(startIndex);
-      dataOut.writeLong(endIndex);
-    }
-  }
-
-  /**
-   * RecordReader for ReuseObjectsEmployeeInputFormat.
-   */
-  public class ReuseObjectsEmployeeRecordReader extends RecordReader<Text, Employee> {
-
-    private ReuseEmployeeInputSplit split;
-    private Text currentKey = new Text();
-    private Employee currentValue = new Employee();
-    private long employeeListIndex = 0L;
-    private long recordsRead = 0L;
-    private List<KV<String, String>> employeeDataList;
-
-    public ReuseObjectsEmployeeRecordReader() {}
-
-    @Override
-    public void close() throws IOException {}
-
-    @Override
-    public Text getCurrentKey() throws IOException, InterruptedException {
-      return currentKey;
-    }
-
-    @Override
-    public Employee getCurrentValue() throws IOException, InterruptedException {
-      return currentValue;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return (float) recordsRead / split.getLength();
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext arg1)
-        throws IOException, InterruptedException {
-      this.split = (ReuseEmployeeInputSplit) split;
-      employeeListIndex = this.split.getStartIndex() - 1;
-      recordsRead = 0;
-      employeeDataList = TestEmployeeDataSet.populateEmployeeData();
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if ((recordsRead++) >= split.getLength()) {
-        return false;
-      }
-      employeeListIndex++;
-      KV<String, String> employeeDetails = employeeDataList.get((int) employeeListIndex);
-      String empData[] = employeeDetails.getValue().split("_");
-      // Updating the same key and value objects with new employee data.
-      currentKey.set(employeeDetails.getKey());
-      currentValue.setEmpName(empData[0]);
-      currentValue.setEmpAddress(empData[1]);
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
deleted file mode 100644
index 4a8fe95..0000000
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.beam.sdk.io.hadoop.inputformat;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.io.Text;
-/**
- * Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for
- * computing splits.
- */
-public class TestEmployeeDataSet {
-  public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
-  public static final long NUMBER_OF_SPLITS = 3L;
-  private static final List<KV<String, String>> data = new ArrayList<KV<String, String>>();
-
-  /**
-   * Returns List of employee details. Employee details are available in the form of {@link KV} in
-   * which, key indicates employee id and value indicates employee details such as name and address
-   * separated by '_'. This is data input to {@link EmployeeInputFormat} and
-   * {@link ReuseObjectsEmployeeInputFormat}.
-   */
-  public static List<KV<String, String>> populateEmployeeData() {
-    if (!data.isEmpty()) {
-      return data;
-    }
-    data.add(KV.of("0", "Alex_US"));
-    data.add(KV.of("1", "John_UK"));
-    data.add(KV.of("2", "Tom_UK"));
-    data.add(KV.of("3", "Nick_UAE"));
-    data.add(KV.of("4", "Smith_IND"));
-    data.add(KV.of("5", "Taylor_US"));
-    data.add(KV.of("6", "Gray_UK"));
-    data.add(KV.of("7", "James_UAE"));
-    data.add(KV.of("8", "Jordan_IND"));
-    data.add(KV.of("9", "Leena_UK"));
-    data.add(KV.of("10", "Zara_UAE"));
-    data.add(KV.of("11", "Talia_IND"));
-    data.add(KV.of("12", "Rose_UK"));
-    data.add(KV.of("13", "Kelvin_UAE"));
-    data.add(KV.of("14", "Goerge_IND"));
-    return data;
-  }
-
-  /**
-   * This is a helper function used in unit tests for validating data against data read using
-   * {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat}.
-   */
-  public static List<KV<Text, Employee>> getEmployeeData() {
-    return Lists.transform((data.isEmpty() ? populateEmployeeData() : data),
-        new Function<KV<String, String>, KV<Text, Employee>>() {
-          @Override
-          public KV<Text, Employee> apply(KV<String, String> input) {
-            String[] empData = input.getValue().split("_");
-            return KV.of(new Text(input.getKey()), new Employee(empData[0], empData[1]));
-          }
-        });
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/README.md b/sdks/java/io/hadoop/README.md
new file mode 100644
index 0000000..d91f019
--- /dev/null
+++ b/sdks/java/io/hadoop/README.md
@@ -0,0 +1,167 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Hadoop InputFormat IO
+
+A HadoopInputFormatIO is a Transform for reading data from any source which
+implements Hadoop InputFormat. For example- Cassandra, Elasticsearch, HBase, Redis, Postgres, etc.
+
+HadoopInputFormatIO has to make several performance trade-offs in connecting to InputFormat, so if there is another Beam IO Transform specifically for connecting to your data source of choice, we would recommend using that one, but this IO Transform allows you to connect to many data sources that do not yet have a Beam IO Transform.
+
+You will need to pass a Hadoop Configuration with parameters specifying how the read will occur. Many properties of the Configuration are optional, and some are required for certain InputFormat classes, but the following properties must be set for all InputFormats:
+
+mapreduce.job.inputformat.class: The InputFormat class used to connect to your data source of choice.
+key.class: The key class returned by the InputFormat in 'mapreduce.job.inputformat.class'.
+value.class: The value class returned by the InputFormat in 'mapreduce.job.inputformat.class'.
+
+For example:
+```java
+Configuration myHadoopConfiguration = new Configuration(false);
+// Set Hadoop InputFormat, key and value class in configuration
+myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
+  InputFormat.class);
+myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
+myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
+```
+
+You will need to check to see if the key and value classes output by the InputFormat have a Beam Coder available. If not, You can use withKeyTranslation/withValueTranslation to specify a method transforming instances of those classes into another class that is supported by a Beam Coder. These settings are optional and you don't need to specify translation for both key and value.
+
+For example:
+```java
+SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
+new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
+  public MyKeyClass apply(InputFormatKeyClass input) {
+  // ...logic to transform InputFormatKeyClass to MyKeyClass
+  }
+};
+SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
+new SimpleFunction<InputFormatValueClass, MyValueClass>() {
+  public MyValueClass apply(InputFormatValueClass input) {
+  // ...logic to transform InputFormatValueClass to MyValueClass
+  }
+};
+```
+
+### Reading using Hadoop InputFormat IO
+Pipeline p = ...; // Create pipeline.
+// Read data only with Hadoop configuration.
+
+```java
+p.apply("read",
+  HadoopInputFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
+  .withConfiguration(myHadoopConfiguration);
+```
+
+// Read data with configuration and key translation (Example scenario: Beam Coder is not
+available for key class hence key translation is required.).
+
+```java
+p.apply("read",
+  HadoopInputFormatIO.<MyKeyClass, InputFormatKeyClass>read()
+  .withConfiguration(myHadoopConfiguration)
+  .withKeyTranslation(myOutputKeyType);
+```
+
+// Read data with configuration and value translation (Example scenario: Beam Coder is not
+available for value class hence value translation is required.).
+
+```java
+p.apply("read",
+  HadoopInputFormatIO.<InputFormatKeyClass, MyValueClass>read()
+  .withConfiguration(myHadoopConfiguration)
+  .withValueTranslation(myOutputValueType);
+```
+
+// Read data with configuration, value translation and key translation (Example scenario: Beam Coders are not available for both key class and value class of InputFormat hence key and value translation is required.).
+
+```java
+p.apply("read",
+  HadoopInputFormatIO.<MyKeyClass, MyValueClass>read()
+  .withConfiguration(myHadoopConfiguration)
+  .withKeyTranslation(myOutputKeyType)
+  .withValueTranslation(myOutputValueType);
+```
+
+# Examples for specific InputFormats
+
+### Cassandra - CqlInputFormat
+
+To read data from Cassandra, org.apache.cassandra.hadoop.cql3.CqlInputFormat
+CqlInputFormat can be used which needs following properties to be set.
+
+Create Cassandra Hadoop configuration as follows:
+
+```java
+Configuration cassandraConf = new Configuration();
+cassandraConf.set("cassandra.input.thrift.port", "9160");
+cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
+cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
+cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
+cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
+cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
+cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
+cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
+```
+
+Call Read transform as follows:
+
+```java
+PCollection<KV<Long, String>> cassandraData =
+  p.apply("read",
+  HadoopInputFormatIO.<Long, String>read()
+  .withConfiguration(cassandraConf)
+  .withValueTranslation(cassandraOutputValueType);
+```
+
+The CqlInputFormat key class is java.lang.Long Long, which has a Beam Coder. The CqlInputFormat value class is com.datastax.driver.core.Row Row, which does not have a Beam Coder. Rather than write a new coder, you can provide your own translation method as follows:
+
+```java
+SimpleFunction<Row, String> cassandraOutputValueType = SimpleFunction<Row, String>()
+{
+  public String apply(Row row) {
+    return row.getString('myColName');
+  }
+};
+```
+ 
+### Elasticsearch - EsInputFormat
+ 
+To read data from Elasticsearch, EsInputFormat can be used which needs following properties to be set.
+ 
+Create ElasticSearch Hadoop configuration as follows:
+
+```java
+Configuration elasticSearchConf = new Configuration();
+elasticSearchConf.set("es.nodes", ElasticsearchHostIp);
+elasticSearchConf.set("es.port", "9200");
+elasticSearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
+elasticSearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
+elasticSearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
+elasticSearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
+```
+
+Call Read transform as follows:
+
+```java
+PCollection<KV<Text, LinkedMapWritable>> elasticData = p.apply("read",
+  HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(elasticSearchConf));
+```
+
+The org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat key class is
+org.apache.hadoop.io.Text Text and value class is org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable. Both key and value classes have Beam Coders.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
new file mode 100644
index 0000000..9558ecd
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format</name>
+  <description>IO to read data from data sources which implement Hadoop Input Format.</description>
+
+  <properties>
+    <log4j.core.version>2.6.2</log4j.core.version>
+    <hadoop.common.version>2.7.0</hadoop.common.version>
+    <guava.version>19.0</guava.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.common.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.common.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file