You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/04 09:19:14 UTC
flink git commit: [FLINK-2918] [api-extending] Add methods to read
Hadoop SequenceFiles.
Repository: flink
Updated Branches:
refs/heads/master 8f74718b1 -> 30a832ec2
[FLINK-2918] [api-extending] Add methods to read Hadoop SequenceFiles.
This closes #1299
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30a832ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30a832ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30a832ec
Branch: refs/heads/master
Commit: 30a832ec2f581d463ab79ca16c275d01df6931f8
Parents: 8f74718
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 08:42:31 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 3 23:42:20 2015 +0100
----------------------------------------------------------------------
docs/apis/programming_guide.md | 32 +++-
.../flink/api/java/ExecutionEnvironment.java | 172 ++++++++++---------
.../java/hadoop/mapred/HadoopInputFormat.java | 6 +-
.../hadoop/mapreduce/HadoopInputFormat.java | 4 +
.../flink/api/scala/ExecutionEnvironment.scala | 23 ++-
.../scala/hadoop/mapred/HadoopInputFormat.scala | 4 +
.../hadoop/mapreduce/HadoopInputFormat.scala | 4 +
7 files changed, 153 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 102b137..8c96980 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1726,6 +1726,13 @@ File-based:
- `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
delimited primitive data types such as `String` or `Integer` using the given delimiter.
+
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified
+ path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
+ type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+
Collection-based:
@@ -1741,7 +1748,7 @@ Collection-based:
- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
parallel. The class specifies the data type of the elements returned by the iterator.
-- `generateSequence(from, to)` - Generates the squence of numbers in the given interval, in
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
parallel.
Generic:
@@ -1772,9 +1779,16 @@ DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
- .pojoType(Person.class, "name", "age", "zipcode");
+ .pojoType(Person.class, "name", "age", "zipcode");
+
-// create a set from some given elements
+// read a file from the specified path of type TextInputFormat
+DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+
+// read a file from the specified path of type SequenceFileInputFormat
+DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+
+// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
// generate a number sequence
@@ -1862,6 +1876,12 @@ File-based:
- `readFileOfPrimitives(path, delimiter)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
delimited primitive data types such as `String` or `Integer` using the given delimiter.
+
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified
+ path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
+
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
+ type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
Collection-based:
@@ -1923,6 +1943,12 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar")
val numbers = env.generateSequence(1, 10000000);
{% endhighlight %}
+// read a file from the specified path of type TextInputFormat
+val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+
+// read a file from the specified path of type SequenceFileInputFormat
+val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+
#### Configuring CSV Parsing
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5405d4e..26121ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -83,12 +83,12 @@ import com.google.common.base.Preconditions;
* and to interact with the outside world (data access).
* <p>
* Please note that the execution environment needs strong type information for the input and return types
- * of all operations that are executed. This means that the environments needs to know that the return
+ * of all operations that are executed. This means that the environments needs to know that the return
* value of an operation is for example a Tuple of String and Integer.
* Because the Java compiler throws much of the generic type information away, most methods attempt to re-
* obtain that information using reflection. In certain cases, it may be necessary to manually supply that
* information to some of the methods.
- *
+ *
* @see LocalEnvironment
* @see RemoteEnvironment
*/
@@ -96,18 +96,18 @@ public abstract class ExecutionEnvironment {
/** The logger used by the environment and its subclasses */
protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
-
+
/** The environment of the context (local by default, cluster if invoked through command line) */
private static ExecutionEnvironmentFactory contextEnvironmentFactory;
-
+
/** The default parallelism used by local environments */
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
-
+
// --------------------------------------------------------------------------------------------
- private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
-
- private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
+ private final List<DataSink<?>> sinks = new ArrayList<>();
+
+ private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<>();
private final ExecutionConfig config = new ExecutionConfig();
@@ -117,14 +117,14 @@ public abstract class ExecutionEnvironment {
/** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
* Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */
protected JobID jobID;
-
+
/** The session timeout in seconds */
protected long sessionTimeout;
-
+
/** Flag to indicate whether sinks have been cleared in previous executions */
private boolean wasExecuted = false;
-
-
+
+
/**
* Creates a new Execution Environment.
*/
@@ -135,10 +135,10 @@ public abstract class ExecutionEnvironment {
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
-
+
/**
* Gets the config object that defines execution parameters.
- *
+ *
* @return The environment's execution configuration.
*/
public ExecutionConfig getConfig() {
@@ -175,32 +175,32 @@ public abstract class ExecutionEnvironment {
public void setParallelism(int parallelism) {
config.setParallelism(parallelism);
}
-
+
/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
- *
+ *
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
}
-
+
/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
- *
+ *
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
}
-
+
/**
* Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
- *
+ *
* @return The execution result from the latest job execution.
*/
public JobExecutionResult getLastJobExecutionResult(){
@@ -235,7 +235,7 @@ public abstract class ExecutionEnvironment {
/**
* Sets the session timeout to hold the intermediate results of a job. This only
* applies the updated timeout in future executions.
- *
+ *
* @param timeout The timeout, in seconds.
*/
public void setSessionTimeout(long timeout) {
@@ -252,7 +252,7 @@ public abstract class ExecutionEnvironment {
* Gets the session timeout for this environment. The session timeout defines for how long
* after an execution, the job and its intermediate results will be kept for future
* interactions.
- *
+ *
* @return The session timeout, in seconds.
*/
public long getSessionTimeout() {
@@ -313,13 +313,13 @@ public abstract class ExecutionEnvironment {
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
config.registerTypeWithKryoSerializer(type, serializerClass);
}
-
+
/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
- *
+ *
* @param type The class of the type to register.
*/
public void registerType(Class<?> type) {
@@ -341,118 +341,118 @@ public abstract class ExecutionEnvironment {
// --------------------------------------------------------------------------------------------
// ---------------------------------- Text Input Format ---------------------------------------
-
+
/**
- * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
* The file will be read with the system's default character set.
- *
+ *
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return A DataSet that represents the data read from the given file as text lines.
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
*/
public DataSource<String> readTextFile(String filePath) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
-
- return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+
+ return new DataSource<>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
}
-
+
/**
- * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
* The {@link java.nio.charset.Charset} with the given name will be used to read the files.
- *
+ *
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
* @param charsetName The name of the character set used to read the file.
- * @return A DataSet that represents the data read from the given file as text lines.
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
*/
public DataSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setCharsetName(charsetName);
- return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+ return new DataSource<>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
}
-
+
// -------------------------- Text Input Format With String Value------------------------------
-
+
/**
- * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
* This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with mutable
* {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
* to be less object and garbage collection heavy.
* <p>
* The file will be read with the system's default character set.
- *
+ *
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return A DataSet that represents the data read from the given file as text lines.
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
*/
public DataSource<StringValue> readTextFileWithValue(String filePath) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
-
- return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
+
+ return new DataSource<>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
}
-
+
/**
- * Creates a DataSet that represents the Strings produced by reading the given file line wise.
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
* This method is similar to {@link #readTextFile(String, String)}, but it produces a DataSet with mutable
* {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
* to be less object and garbage collection heavy.
* <p>
* The {@link java.nio.charset.Charset} with the given name will be used to read the files.
- *
+ *
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
* @param charsetName The name of the character set used to read the file.
* @param skipInvalidLines A flag to indicate whether to skip lines that cannot be read with the given character set.
- *
+ *
* @return A DataSet that represents the data read from the given file as text lines.
*/
public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
-
+
TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
format.setCharsetName(charsetName);
format.setSkipInvalidLines(skipInvalidLines);
- return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
+ return new DataSource<>(this, format, new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
}
// ----------------------------------- Primitive Input Format ---------------------------------------
/**
- * Creates a DataSet that represents the primitive type produced by reading the given file line wise.
+ * Creates a {@link DataSet} that represents the primitive type produced by reading the given file line wise.
* This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
* {@link org.apache.flink.api.java.tuple.Tuple1}.
*
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
* @param typeClass The primitive type class to be read.
- * @return A DataSet that represents the data read from the given file as primitive type.
+ * @return A {@link DataSet} that represents the data read from the given file as primitive type.
*/
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
- return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+ return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}
/**
- * Creates a DataSet that represents the primitive type produced by reading the given file in delimited way.
+ * Creates a {@link DataSet} that represents the primitive type produced by reading the given file in delimited way.
* This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
* {@link org.apache.flink.api.java.tuple.Tuple1}.
*
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
* @param delimiter The delimiter of the given file.
* @param typeClass The primitive type class to be read.
- * @return A DataSet that represents the data read from the given file as primitive type.
+ * @return A {@link DataSet} that represents the data read from the given file as primitive type.
*/
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
- return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+ return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}
// ----------------------------------- CSV Input Format ---------------------------------------
-
+
/**
* Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
* define parameters and field types and will eventually produce the DataSet that corresponds to
* the read and parsed CSV input.
- *
+ *
* @param filePath The path of the CSV file.
* @return A CsvReader that can be used to configure the CSV input.
*/
@@ -461,7 +461,7 @@ public abstract class ExecutionEnvironment {
}
// ------------------------------------ File Input Format -----------------------------------------
-
+
public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.");
@@ -469,7 +469,7 @@ public abstract class ExecutionEnvironment {
if (filePath == null) {
throw new IllegalArgumentException("The file path must not be null.");
}
-
+
inputFormat.setFilePath(new Path(filePath));
try {
return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
@@ -480,11 +480,11 @@ public abstract class ExecutionEnvironment {
"'createInput(InputFormat, TypeInformation)' method instead.");
}
}
-
+
// ----------------------------------- Generic Input Format ---------------------------------------
-
+
/**
- * Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
+ * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet will not be
* immediately created - instead, this method returns a DataSet that will be lazily created from
* the input format once the program is executed.
* <p>
@@ -493,17 +493,17 @@ public abstract class ExecutionEnvironment {
* by reflection, unless the input format implements the {@link ResultTypeQueryable} interface.
* In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()}
* method to determine data type produced by the input format.
- *
+ *
* @param inputFormat The input format used to create the data set.
- * @return A DataSet that represents the data created by the input format.
- *
+ * @return A {@link DataSet} that represents the data created by the input format.
+ *
* @see #createInput(InputFormat, TypeInformation)
*/
public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.");
}
-
+
try {
return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
}
@@ -515,29 +515,29 @@ public abstract class ExecutionEnvironment {
}
/**
- * Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
- * immediately created - instead, this method returns a DataSet that will be lazily created from
+ * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet} will not be
+ * immediately created - instead, this method returns a {@link DataSet} that will be lazily created from
* the input format once the program is executed.
* <p>
- * The data set is typed to the given TypeInformation. This method is intended for input formats that
+ * The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that
* where the return type cannot be determined by reflection analysis, and that do not implement the
* {@link ResultTypeQueryable} interface.
- *
+ *
* @param inputFormat The input format used to create the data set.
- * @return A DataSet that represents the data created by the input format.
- *
+ * @return A {@link DataSet} that represents the data created by the input format.
+ *
* @see #createInput(InputFormat)
*/
public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat, TypeInformation<X> producedType) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.");
}
-
+
if (producedType == null) {
throw new IllegalArgumentException("Produced type information must not be null.");
}
-
- return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
+
+ return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
}
// ----------------------------------- Hadoop Input Format ---------------------------------------
@@ -555,6 +555,14 @@ public abstract class ExecutionEnvironment {
}
/**
+ * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
+ * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+ */
+ public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+ return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+ }
+
+ /**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
* {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
*/
@@ -566,7 +574,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
*/
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
- HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+ HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
return this.createInput(hadoopInputFormat);
}
@@ -596,7 +604,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
- org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapreduceInputFormat, key, value, job);
+ org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
return this.createInput(hadoopInputFormat);
}
@@ -631,7 +639,7 @@ public abstract class ExecutionEnvironment {
TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
CollectionInputFormat.checkCollection(data, type.getTypeClass());
- return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
+ return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
}
/**
@@ -652,7 +660,7 @@ public abstract class ExecutionEnvironment {
private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
CollectionInputFormat.checkCollection(data, type.getTypeClass());
- return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, callLocationName);
+ return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);
}
/**
@@ -691,7 +699,7 @@ public abstract class ExecutionEnvironment {
* @see #fromCollection(Iterator, Class)
*/
public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
- return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
+ return new DataSource<>(this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName());
}
@@ -761,7 +769,7 @@ public abstract class ExecutionEnvironment {
// private helper for passing different call location names
private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
- return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type, callLocationName);
+ return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
}
/**
@@ -855,7 +863,7 @@ public abstract class ExecutionEnvironment {
* @param executable flag indicating whether the file should be executable
*/
public void registerCachedFile(String filePath, String name, boolean executable){
- this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
+ this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable)));
}
/**
@@ -948,7 +956,7 @@ public abstract class ExecutionEnvironment {
}
}
if(typeInfo instanceof CompositeType) {
- List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>();
+ List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
for(GenericTypeInfo<?> gt : genericTypesInComposite) {
Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
index 967f8c4..b0c4c42 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
@@ -42,6 +42,10 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
super(mapredInputFormat, key, value, job);
}
+
+ public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value) {
+ super(mapredInputFormat, key, value, new JobConf());
+ }
@Override
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
@@ -59,6 +63,6 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
@Override
public TypeInformation<Tuple2<K,V>> getProducedType() {
- return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+ return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
index f0788c7..4d50851 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -41,6 +41,10 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<
super(mapreduceInputFormat, key, value, job);
}
+ public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value) throws IOException {
+ super(mapreduceInputFormat, key, value, Job.getInstance());
+ }
+
@Override
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
if(!this.fetched) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 9238f5c..6381a12 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -17,14 +17,12 @@
*/
package org.apache.flink.api.scala
-import java.util.UUID
-
import com.esotericsoftware.kryo.Serializer
import com.google.common.base.Preconditions
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult, JobID}
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.operators.DataSource
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -291,7 +289,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat.setCommentPrefix(ignoreComments)
if (quoteCharacter != null) {
- inputFormat.enableQuotedStringParsing(quoteCharacter);
+ inputFormat.enableQuotedStringParsing(quoteCharacter)
}
val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]]
@@ -305,12 +303,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
throw new IllegalArgumentException(
"POJO fields must be specified (not null) if output type is a POJO.")
} else {
- for (i <- 0 until pojoFields.length) {
+ for (i <- pojoFields.indices) {
val pos = info.getFieldIndex(pojoFields(i))
if (pos < 0) {
throw new IllegalArgumentException(
"Field \"" + pojoFields(i) + "\" not part of POJO type " +
- info.getTypeClass.getCanonicalName);
+ info.getTypeClass.getCanonicalName)
}
classesBuf += info.getPojoFieldAt(pos).getTypeInformation().getTypeClass
}
@@ -424,6 +422,19 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
+ * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
+ * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+ */
+ def readSequenceFile[K, V](
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)
+ (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+ readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+ key, value, inputPath)
+ }
+
+ /**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
*/
def createHadoopInput[K, V](
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
index d03e433..0e23cab 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -27,6 +27,10 @@ class HadoopInputFormat[K, V](
job: JobConf)
extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
+ def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+ this(mapredInputFormat, keyClass, valueClass, new JobConf)
+ }
+
def nextRecord(reuse: (K, V)): (K, V) = {
if (!fetched) {
fetchNext()
http://git-wip-us.apache.org/repos/asf/flink/blob/30a832ec/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
index 9c94f29..9079db5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -28,6 +28,10 @@ class HadoopInputFormat[K, V](
job: Job)
extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
+ def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+ this(mapredInputFormat, keyClass, valueClass, Job.getInstance())
+ }
+
def nextRecord(reuse: (K, V)): (K, V) = {
if (!fetched) {
fetchNext()