You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/04 09:19:14 UTC

flink git commit: [FLINK-2918] [api-extending] Add methods to read Hadoop SequenceFiles.

Repository: flink
Updated Branches:
  refs/heads/master 8f74718b1 -> 30a832ec2


[FLINK-2918] [api-extending] Add methods to read Hadoop SequenceFiles.

This closes #1299


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30a832ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30a832ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30a832ec

Branch: refs/heads/master
Commit: 30a832ec2f581d463ab79ca16c275d01df6931f8
Parents: 8f74718
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 08:42:31 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 3 23:42:20 2015 +0100

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  32 +++-
 .../flink/api/java/ExecutionEnvironment.java    | 172 ++++++++++---------
 .../java/hadoop/mapred/HadoopInputFormat.java   |   6 +-
 .../hadoop/mapreduce/HadoopInputFormat.java     |   4 +
 .../flink/api/scala/ExecutionEnvironment.scala  |  23 ++-
 .../scala/hadoop/mapred/HadoopInputFormat.scala |   4 +
 .../hadoop/mapreduce/HadoopInputFormat.scala    |   4 +
 7 files changed, 153 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 102b137..8c96980 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1726,6 +1726,13 @@ File-based:
    
 - `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
    delimited primitive data types such as `String` or `Integer` using the given delimiter.
+   
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified 
+   path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+   
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
+   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+ 
 
 Collection-based:
 
@@ -1741,7 +1748,7 @@ Collection-based:
 - `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
   parallel. The class specifies the data type of the elements returned by the iterator.
 
-- `generateSequence(from, to)` - Generates the squence of numbers in the given interval, in
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
   parallel.
 
 Generic:
@@ -1772,9 +1779,16 @@ DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file
 
 // read a CSV file with three fields into a POJO (Person.class) with corresponding fields
 DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-                         .pojoType(Person.class, "name", "age", "zipcode");                         
+                         .pojoType(Person.class, "name", "age", "zipcode");  
+                                                 
 
-// create a set from some given elements
+// read a file from the specified path of type TextInputFormat 
+DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+                         
+// read a file from the specified path of type SequenceFileInputFormat
+DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+
+// creates a set from some given elements
 DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
 
 // generate a number sequence
@@ -1862,6 +1876,12 @@ File-based:
 
 - `readFileOfPrimitives(path, delimiter)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
   delimited primitive data types such as `String` or `Integer` using the given delimiter.
+  
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified 
+   path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+   
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
+   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.  
 
 Collection-based:
 
@@ -1923,6 +1943,12 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar")
 val numbers = env.generateSequence(1, 10000000);
 {% endhighlight %}
 
+// read a file from the specified path of type TextInputFormat 
+val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+                         
+// read a file from the specified path of type SequenceFileInputFormat
+val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+
 
 #### Configuring CSV Parsing
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5405d4e..26121ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -83,12 +83,12 @@ import com.google.common.base.Preconditions;
  * and to interact with the outside world (data access).
  * <p>
  * Please note that the execution environment needs strong type information for the input and return types
- * of all operations that are executed. This means that the environments needs to know that the return 
+ * of all operations that are executed. This means that the environments needs to know that the return
  * value of an operation is for example a Tuple of String and Integer.
  * Because the Java compiler throws much of the generic type information away, most methods attempt to re-
  * obtain that information using reflection. In certain cases, it may be necessary to manually supply that
  * information to some of the methods.
- * 
+ *
  * @see LocalEnvironment
  * @see RemoteEnvironment
  */
@@ -96,18 +96,18 @@ public abstract class ExecutionEnvironment {
 
 	/** The logger used by the environment and its subclasses */
 	protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
-	
+
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static ExecutionEnvironmentFactory contextEnvironmentFactory;
-	
+
 	/** The default parallelism used by local environments */
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
-	
+
 	// --------------------------------------------------------------------------------------------
 
-	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
-	
-	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
+	private final List<DataSink<?>> sinks = new ArrayList<>();
+
+	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<>();
 
 	private final ExecutionConfig config = new ExecutionConfig();
 
@@ -117,14 +117,14 @@ public abstract class ExecutionEnvironment {
 	/** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
 	 *  Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */
 	protected JobID jobID;
-	
+
 	/** The session timeout in seconds */
 	protected long sessionTimeout;
-	
+
 	/** Flag to indicate whether sinks have been cleared in previous executions */
 	private boolean wasExecuted = false;
-	
-	
+
+
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -135,10 +135,10 @@ public abstract class ExecutionEnvironment {
 	// --------------------------------------------------------------------------------------------
 	//  Properties
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the config object that defines execution parameters.
-	 * 
+	 *
 	 * @return The environment's execution configuration.
 	 */
 	public ExecutionConfig getConfig() {
@@ -175,32 +175,32 @@ public abstract class ExecutionEnvironment {
 	public void setParallelism(int parallelism) {
 		config.setParallelism(parallelism);
 	}
-	
+
 	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of zero
 	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
 	 * default value (as defined in the configuration) should be used.
-	 * 
+	 *
 	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
 	}
-	
+
 	/**
 	 * Gets the number of times the system will try to re-execute failed tasks. A value
 	 * of {@code -1} indicates that the system default value (as defined in the configuration)
 	 * should be used.
-	 * 
+	 *
 	 * @return The number of times the system will try to re-execute failed tasks.
 	 */
 	public int getNumberOfExecutionRetries() {
 		return config.getNumberOfExecutionRetries();
 	}
-	
+
 	/**
 	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
-	 * 
+	 *
 	 * @return The execution result from the latest job execution.
 	 */
 	public JobExecutionResult getLastJobExecutionResult(){
@@ -235,7 +235,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Sets the session timeout to hold the intermediate results of a job. This only
 	 * applies the updated timeout in future executions.
-	 * 
+	 *
 	 * @param timeout The timeout, in seconds.
 	 */
 	public void setSessionTimeout(long timeout) {
@@ -252,7 +252,7 @@ public abstract class ExecutionEnvironment {
 	 * Gets the session timeout for this environment. The session timeout defines for how long
 	 * after an execution, the job and its intermediate results will be kept for future
 	 * interactions.
-	 * 
+	 *
 	 * @return The session timeout, in seconds.
 	 */
 	public long getSessionTimeout() {
@@ -313,13 +313,13 @@ public abstract class ExecutionEnvironment {
 	public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
 		config.registerTypeWithKryoSerializer(type, serializerClass);
 	}
-	
+
 	/**
 	 * Registers the given type with the serialization stack. If the type is eventually
 	 * serialized as a POJO, then the type is registered with the POJO serializer. If the
 	 * type ends up being serialized with Kryo, then it will be registered at Kryo to make
 	 * sure that only tags are written.
-	 *  
+	 *
 	 * @param type The class of the type to register.
 	 */
 	public void registerType(Class<?> type) {
@@ -341,118 +341,118 @@ public abstract class ExecutionEnvironment {
 	// --------------------------------------------------------------------------------------------
 
 	// ---------------------------------- Text Input Format ---------------------------------------
-	
+
 	/**
-	 * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+	 * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
 	 * The file will be read with the system's default character set.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return A DataSet that represents the data read from the given file as text lines.
+	 * @return A {@link DataSet} that represents the data read from the given file as text lines.
 	 */
 	public DataSource<String> readTextFile(String filePath) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
-		
-		return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+
+		return new DataSource<>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+	 * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
 	 * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @param charsetName The name of the character set used to read the file.
-	 * @return A DataSet that represents the data read from the given file as text lines.
+	 * @return A {@link DataSet} that represents the data read from the given file as text lines.
 	 */
 	public DataSource<String> readTextFile(String filePath, String charsetName) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
-		return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+		return new DataSource<>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
-	
+
 	// -------------------------- Text Input Format With String Value------------------------------
-	
+
 	/**
-	 * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+	 * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
 	 * This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with mutable
 	 * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
 	 * to be less object and garbage collection heavy.
 	 * <p>
 	 * The file will be read with the system's default character set.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return A DataSet that represents the data read from the given file as text lines.
+	 * @return A {@link DataSet} that represents the data read from the given file as text lines.
 	 */
 	public DataSource<StringValue> readTextFileWithValue(String filePath) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
-		
-		return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
+
+		return new DataSource<>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+	 * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
 	 * This method is similar to {@link #readTextFile(String, String)}, but it produces a DataSet with mutable
 	 * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
 	 * to be less object and garbage collection heavy.
 	 * <p>
 	 * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @param charsetName The name of the character set used to read the file.
 	 * @param skipInvalidLines A flag to indicate whether to skip lines that cannot be read with the given character set.
-	 * 
+	 *
 	 * @return A DataSet that represents the data read from the given file as text lines.
 	 */
 	public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
-		
+
 		TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
 		format.setSkipInvalidLines(skipInvalidLines);
-		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
+		return new DataSource<>(this, format, new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
 	}
 
 	// ----------------------------------- Primitive Input Format ---------------------------------------
 
 	/**
-	 * Creates a DataSet that represents the primitive type produced by reading the given file line wise.
+	 * Creates a {@link DataSet} that represents the primitive type produced by reading the given file line wise.
 	 * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
 	 * {@link org.apache.flink.api.java.tuple.Tuple1}.
 	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @param typeClass The primitive type class to be read.
-	 * @return A DataSet that represents the data read from the given file as primitive type.
+	 * @return A {@link DataSet} that represents the data read from the given file as primitive type.
 	 */
 	public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 
-		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+		return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
 	}
 
 	/**
-	 * Creates a DataSet that represents the primitive type produced by reading the given file in delimited way.
+	 * Creates a {@link DataSet} that represents the primitive type produced by reading the given file in delimited way.
 	 * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
 	 * {@link org.apache.flink.api.java.tuple.Tuple1}.
 	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @param delimiter The delimiter of the given file.
 	 * @param typeClass The primitive type class to be read.
-	 * @return A DataSet that represents the data read from the given file as primitive type.
+	 * @return A {@link DataSet} that represents the data read from the given file as primitive type.
 	 */
 	public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 
-		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+		return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
 	}
 
 	// ----------------------------------- CSV Input Format ---------------------------------------
-	
+
 	/**
 	 * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
 	 * define parameters and field types and will eventually produce the DataSet that corresponds to
 	 * the read and parsed CSV input.
-	 * 
+	 *
 	 * @param filePath The path of the CSV file.
 	 * @return A CsvReader that can be used to configure the CSV input.
 	 */
@@ -461,7 +461,7 @@ public abstract class ExecutionEnvironment {
 	}
 
 	// ------------------------------------ File Input Format -----------------------------------------
-	
+
 	public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) {
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("InputFormat must not be null.");
@@ -469,7 +469,7 @@ public abstract class ExecutionEnvironment {
 		if (filePath == null) {
 			throw new IllegalArgumentException("The file path must not be null.");
 		}
-		
+
 		inputFormat.setFilePath(new Path(filePath));
 		try {
 			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
@@ -480,11 +480,11 @@ public abstract class ExecutionEnvironment {
 					"'createInput(InputFormat, TypeInformation)' method instead.");
 		}
 	}
-	
+
 	// ----------------------------------- Generic Input Format ---------------------------------------
-	
+
 	/**
-	 * Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
+	 * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet will not be
 	 * immediately created - instead, this method returns a DataSet that will be lazily created from
 	 * the input format once the program is executed.
 	 * <p>
@@ -493,17 +493,17 @@ public abstract class ExecutionEnvironment {
 	 * by reflection, unless the input format implements the {@link ResultTypeQueryable} interface.
 	 * In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()}
 	 * method to determine data type produced by the input format.
-	 * 
+	 *
 	 * @param inputFormat The input format used to create the data set.
-	 * @return A DataSet that represents the data created by the input format.
-	 * 
+	 * @return A {@link DataSet} that represents the data created by the input format.
+	 *
 	 * @see #createInput(InputFormat, TypeInformation)
 	 */
 	public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat) {
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("InputFormat must not be null.");
 		}
-		
+
 		try {
 			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
 		}
@@ -515,29 +515,29 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
-	 * Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
-	 * immediately created - instead, this method returns a DataSet that will be lazily created from
+	 * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet} will not be
+	 * immediately created - instead, this method returns a {@link DataSet} that will be lazily created from
 	 * the input format once the program is executed.
 	 * <p>
-	 * The data set is typed to the given TypeInformation. This method is intended for input formats that
+	 * The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that
 	 * where the return type cannot be determined by reflection analysis, and that do not implement the
 	 * {@link ResultTypeQueryable} interface.
-	 * 
+	 *
 	 * @param inputFormat The input format used to create the data set.
-	 * @return A DataSet that represents the data created by the input format.
-	 * 
+	 * @return A {@link DataSet} that represents the data created by the input format.
+	 *
 	 * @see #createInput(InputFormat)
 	 */
 	public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat, TypeInformation<X> producedType) {
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("InputFormat must not be null.");
 		}
-		
+
 		if (producedType == null) {
 			throw new IllegalArgumentException("Produced type information must not be null.");
 		}
-		
-		return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
+
+		return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
 	}
 
 	// ----------------------------------- Hadoop Input Format ---------------------------------------
@@ -555,6 +555,14 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
+	 * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+ 	 */
+	public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+	}
+
+	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
 	 */
@@ -566,7 +574,7 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
 	 */
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
 
 		return this.createInput(hadoopInputFormat);
 	}
@@ -596,7 +604,7 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
 	 */
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapreduceInputFormat, key, value, job);
+		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
 
 		return this.createInput(hadoopInputFormat);
 	}
@@ -631,7 +639,7 @@ public abstract class ExecutionEnvironment {
 		
 		TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
-		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
+		return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -652,7 +660,7 @@ public abstract class ExecutionEnvironment {
 	
 	private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
-		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, callLocationName);
+		return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);
 	}
 	
 	/**
@@ -691,7 +699,7 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromCollection(Iterator, Class)
 	 */
 	public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
-		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
+		return new DataSource<>(this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName());
 	}
 	
 	
@@ -761,7 +769,7 @@ public abstract class ExecutionEnvironment {
 	
 	// private helper for passing different call location names
 	private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
-		return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type, callLocationName);
+		return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
 	}
 	
 	/**
@@ -855,7 +863,7 @@ public abstract class ExecutionEnvironment {
 	 * @param executable flag indicating whether the file should be executable
 	 */
 	public void registerCachedFile(String filePath, String name, boolean executable){
-		this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
+		this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable)));
 	}
 	
 	/**
@@ -948,7 +956,7 @@ public abstract class ExecutionEnvironment {
 					}
 				}
 				if(typeInfo instanceof CompositeType) {
-					List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>();
+					List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
 					Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
 					for(GenericTypeInfo<?> gt : genericTypesInComposite) {
 						Serializers.recursivelyRegisterType(gt.getTypeClass(), config);

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
index 967f8c4..b0c4c42 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
@@ -42,6 +42,10 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
 	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
 		super(mapredInputFormat, key, value, job);
 	}
+
+	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value) {
+		super(mapredInputFormat, key, value, new JobConf());
+	}
 	
 	@Override
 	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
@@ -59,6 +63,6 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
 	
 	@Override
 	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+		return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
index f0788c7..4d50851 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -41,6 +41,10 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
 		super(mapreduceInputFormat, key, value, job);
 	}
 
+	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value) throws IOException {
+		super(mapreduceInputFormat, key, value, Job.getInstance());
+	}
+
 	@Override
 	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
 		if(!this.fetched) {

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 9238f5c..6381a12 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -17,14 +17,12 @@
  */
 package org.apache.flink.api.scala
 
-import java.util.UUID
-
 import com.esotericsoftware.kryo.Serializer
 import com.google.common.base.Preconditions
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult, JobID}
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -291,7 +289,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     inputFormat.setCommentPrefix(ignoreComments)
 
     if (quoteCharacter != null) {
-      inputFormat.enableQuotedStringParsing(quoteCharacter);
+      inputFormat.enableQuotedStringParsing(quoteCharacter)
     }
 
     val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]]
@@ -305,12 +303,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
           throw new IllegalArgumentException(
             "POJO fields must be specified (not null) if output type is a POJO.")
         } else {
-          for (i <- 0 until pojoFields.length) {
+          for (i <- pojoFields.indices) {
             val pos = info.getFieldIndex(pojoFields(i))
             if (pos < 0) {
               throw new IllegalArgumentException(
                 "Field \"" + pojoFields(i) + "\" not part of POJO type " +
-                  info.getTypeClass.getCanonicalName);
+                  info.getTypeClass.getCanonicalName)
             }
             classesBuf += info.getPojoFieldAt(pos).getTypeInformation().getTypeClass
           }
@@ -424,6 +422,19 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
+   * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   */
+  def readSequenceFile[K, V](
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+      key, value, inputPath)
+  }
+
+  /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
    */
   def createHadoopInput[K, V](

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
index d03e433..0e23cab 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -27,6 +27,10 @@ class HadoopInputFormat[K, V](
     job: JobConf)
   extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
 
+  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+    this(mapredInputFormat, keyClass, valueClass, new JobConf)
+  }
+
   def nextRecord(reuse: (K, V)): (K, V) = {
     if (!fetched) {
       fetchNext()

http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
index 9c94f29..9079db5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -28,6 +28,10 @@ class HadoopInputFormat[K, V](
     job: Job)
   extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
 
+  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+    this(mapredInputFormat, keyClass, valueClass, Job.getInstance())
+  }
+
   def nextRecord(reuse: (K, V)): (K, V) = {
     if (!fetched) {
       fetchNext()