You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:46 UTC
[16/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-hadoop-compatibility
[FLINK-6711] Activate strict checkstyle for flink-hadoop-compatibility
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab8fe57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab8fe57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab8fe57
Branch: refs/heads/master
Commit: fab8fe57ca7808a8c7dfaee1834a0429217942f2
Parents: b12de1e
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:56:53 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:21 2017 +0200
----------------------------------------------------------------------
.../flink-hadoop-compatibility/pom.xml | 7 +-
.../api/java/typeutils/WritableTypeInfo.java | 23 +++--
.../typeutils/runtime/WritableComparator.java | 65 ++++++------
.../typeutils/runtime/WritableSerializer.java | 54 +++++-----
.../flink/hadoopcompatibility/HadoopInputs.java | 28 +++---
.../flink/hadoopcompatibility/HadoopUtils.java | 6 +-
.../mapred/HadoopMapFunction.java | 59 +++++------
.../mapred/HadoopReduceCombineFunction.java | 75 +++++++-------
.../mapred/HadoopReduceFunction.java | 67 +++++++------
.../mapred/wrapper/HadoopOutputCollector.java | 15 +--
.../wrapper/HadoopTupleUnwrappingIterator.java | 32 +++---
.../scala/HadoopInputs.scala | 6 +-
.../java/typeutils/WritableExtractionTest.java | 38 ++++---
.../java/typeutils/WritableInfoParserTest.java | 10 +-
.../java/typeutils/WritableTypeInfoTest.java | 10 +-
.../typeutils/runtime/StringArrayWritable.java | 36 ++++---
.../runtime/WritableComparatorTest.java | 25 +++--
.../runtime/WritableComparatorUUIDTest.java | 3 +
.../api/java/typeutils/runtime/WritableID.java | 4 +
.../runtime/WritableSerializerTest.java | 28 +++---
.../runtime/WritableSerializerUUIDTest.java | 3 +
.../hadoopcompatibility/HadoopUtilsTest.java | 4 +
.../mapred/HadoopMapFunctionITCase.java | 45 +++++----
.../mapred/HadoopMapredITCase.java | 8 +-
.../HadoopReduceCombineFunctionITCase.java | 69 +++++++++----
.../mapred/HadoopReduceFunctionITCase.java | 61 +++++++----
.../mapred/HadoopTestData.java | 60 +++++------
.../example/HadoopMapredCompatWordCount.java | 70 +++++++------
.../HadoopTupleUnwrappingIteratorTest.java | 100 ++++++++++---------
.../mapreduce/HadoopInputOutputITCase.java | 16 ++-
.../mapreduce/example/WordCount.java | 49 ++++-----
31 files changed, 604 insertions(+), 472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml
index 2dee17d..9427e43 100644
--- a/flink-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -19,9 +19,9 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+
<modelVersion>4.0.0</modelVersion>
-
+
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
@@ -82,9 +82,8 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
- </dependencies>
+ </dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 7bcb4bf..cde309b 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+
import org.apache.hadoop.io.Writable;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,9 +42,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
@Public
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
-
+
private static final long serialVersionUID = 1L;
-
+
private final Class<T> typeClass;
@PublicEvolving
@@ -59,11 +60,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
- if(Comparable.class.isAssignableFrom(typeClass)) {
+ if (Comparable.class.isAssignableFrom(typeClass)) {
return new WritableComparator(sortOrderAscending, typeClass);
}
else {
- throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
+ throw new UnsupportedOperationException("Cannot create Comparator for " + typeClass.getCanonicalName() + ". " +
"Class does not implement Comparable interface.");
}
}
@@ -85,7 +86,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
public int getArity() {
return 1;
}
-
+
@Override
@PublicEvolving
public int getTotalFields() {
@@ -109,17 +110,17 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return new WritableSerializer<T>(typeClass);
}
-
+
@Override
public String toString() {
return "WritableType<" + typeClass.getName() + ">";
- }
-
+ }
+
@Override
public int hashCode() {
return typeClass.hashCode();
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof WritableTypeInfo) {
@@ -138,7 +139,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
public boolean canEqual(Object obj) {
return obj instanceof WritableTypeInfo;
}
-
+
// --------------------------------------------------------------------------------------------
@PublicEvolving
@@ -150,5 +151,5 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 3a95d94..083a56f 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -18,30 +18,35 @@
package org.apache.flink.api.java.typeutils.runtime;
-import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NormalizableKey;
import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
import org.apache.hadoop.io.Writable;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
+/**
+ * A {@link TypeComparator} for {@link Writable}.
+ * @param <T>
+ */
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
-
+
private static final long serialVersionUID = 1L;
-
+
private Class<T> type;
-
+
private final boolean ascendingComparison;
-
+
private transient T reference;
-
+
private transient T tempReference;
-
+
private transient Kryo kryo;
@SuppressWarnings("rawtypes")
@@ -51,78 +56,78 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
this.type = type;
this.ascendingComparison = ascending;
}
-
+
@Override
public int hash(T record) {
return record.hashCode();
}
-
+
@Override
public void setReference(T toCompare) {
checkKryoInitialized();
reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
}
-
+
@Override
public boolean equalToReference(T candidate) {
return candidate.equals(reference);
}
-
+
@Override
public int compareToReference(TypeComparator<T> referencedComparator) {
T otherRef = ((WritableComparator<T>) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
-
+
@Override
public int compare(T first, T second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
-
+
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
ensureReferenceInstantiated();
ensureTempReferenceInstantiated();
-
+
reference.readFields(firstSource);
tempReference.readFields(secondSource);
-
+
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
-
+
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(type);
}
-
+
@Override
public int getNormalizeKeyLen() {
ensureReferenceInstantiated();
-
+
NormalizableKey<?> key = (NormalizableKey<?>) reference;
return key.getMaxNormalizedKeyLen();
}
-
+
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
-
+
@Override
public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
NormalizableKey<?> key = (NormalizableKey<?>) record;
key.copyNormalizedKey(target, offset, numBytes);
}
-
+
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
-
+
@Override
public TypeComparator<T> duplicate() {
return new WritableComparator<T>(ascendingComparison, type);
@@ -139,28 +144,28 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
public TypeComparator[] getFlatComparators() {
return comparators;
}
-
+
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
-
+
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
-
+
@Override
public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
-
+
// --------------------------------------------------------------------------------------------
-
+
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
@@ -173,13 +178,13 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
this.kryo.register(type);
}
}
-
+
private void ensureReferenceInstantiated() {
if (reference == null) {
reference = InstantiationUtil.instantiate(type, Writable.class);
}
}
-
+
private void ensureTempReferenceInstantiated() {
if (tempReference == null) {
tempReference = InstantiationUtil.instantiate(type, Writable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 421d7a3..161e65b 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
@@ -28,98 +26,102 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
+/**
+ * A {@link TypeSerializer} for {@link Writable}.
+ * @param <T>
+ */
@Internal
public final class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-
+
private static final long serialVersionUID = 1L;
-
+
private final Class<T> typeClass;
-
+
private transient Kryo kryo;
-
+
private transient T copyInstance;
-
+
public WritableSerializer(Class<T> typeClass) {
this.typeClass = typeClass;
}
-
+
@SuppressWarnings("unchecked")
@Override
public T createInstance() {
- if(typeClass == NullWritable.class) {
+ if (typeClass == NullWritable.class) {
return (T) NullWritable.get();
}
return InstantiationUtil.instantiate(typeClass);
}
-
-
@Override
public T copy(T from) {
checkKryoInitialized();
return KryoUtils.copy(from, kryo, this);
}
-
+
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
return KryoUtils.copy(from, reuse, kryo, this);
}
-
+
@Override
public int getLength() {
return -1;
}
-
+
@Override
public void serialize(T record, DataOutputView target) throws IOException {
record.write(target);
}
-
+
@Override
public T deserialize(DataInputView source) throws IOException {
return deserialize(createInstance(), source);
}
-
+
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.readFields(source);
return reuse;
}
-
+
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
ensureInstanceInstantiated();
copyInstance.readFields(source);
copyInstance.write(target);
}
-
+
@Override
public boolean isImmutableType() {
return false;
}
-
+
@Override
public WritableSerializer<T> duplicate() {
return new WritableSerializer<T>(typeClass);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
private void ensureInstanceInstantiated() {
if (copyInstance == null) {
copyInstance = createInstance();
}
}
-
+
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
@@ -133,12 +135,12 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
}
}
// --------------------------------------------------------------------------------------------
-
+
@Override
public int hashCode() {
return this.typeClass.hashCode();
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof WritableSerializer) {
@@ -175,6 +177,10 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
}
}
+ /**
+ * The config snapshot for this serializer.
+ * @param <T>
+ */
public static final class WritableSerializerConfigSnapshot<T extends Writable>
extends GenericTypeSerializerConfigSnapshot<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
index 9e8a3e4..dd5a74f 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -20,6 +20,7 @@ package org.apache.flink.hadoopcompatibility;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -28,10 +29,10 @@ import java.io.IOException;
/**
* HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
*
- * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * <p>It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
* and {@link org.apache.hadoop.mapreduce.InputFormat}.
*
- * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * <p>Key value pairs produced by the Hadoop InputFormats are converted into Flink
* {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
* ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
* ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
@@ -46,7 +47,7 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
*/
- public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+ public static <K, V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
// set input path in JobConf
org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
// return wrapping InputFormat
@@ -58,7 +59,7 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
*/
- public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+ public static <K, V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
}
@@ -67,7 +68,7 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
*/
- public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+ public static <K, V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
}
@@ -76,7 +77,7 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop InputFormat.
*/
- public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+ public static <K, V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
}
@@ -85,9 +86,8 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
*/
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
- {
+ public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
// set input path in Job
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
// return wrapping InputFormat
@@ -99,9 +99,8 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
*/
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
- {
+ public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}
@@ -110,9 +109,8 @@ public final class HadoopInputs {
*
* @return A Flink InputFormat that wraps the Hadoop InputFormat.
*/
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
- org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
- {
+ public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+ org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
index 97ca329..738e2f8 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -18,8 +18,9 @@
package org.apache.flink.hadoopcompatibility;
-import org.apache.commons.cli.Option;
import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.apache.commons.cli.Option;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
@@ -31,7 +32,7 @@ import java.util.Map;
*/
public class HadoopUtils {
/**
- * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+ * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}.
*
* @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
* @return A {@link ParameterTool}
@@ -49,4 +50,3 @@ public class HadoopUtils {
}
}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index ba8aa90..5b679fe 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -18,68 +18,69 @@
package org.apache.flink.hadoopcompatibility.mapred;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
/**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*/
@SuppressWarnings("rawtypes")
@Public
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
- implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+ implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
- private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+ private transient Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
private transient JobConf jobConf;
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+ private transient HadoopOutputCollector<KEYOUT, VALUEOUT> outputCollector;
private transient Reporter reporter;
-
+
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
- *
+ *
* @param hadoopMapper The Hadoop Mapper to wrap.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
this(hadoopMapper, new JobConf());
}
-
+
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
* The Hadoop Mapper is configured with the provided JobConf.
- *
+ *
* @param hadoopMapper The Hadoop Mapper to wrap.
* @param conf The JobConf that is used to configure the Hadoop Mapper.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
- if(hadoopMapper == null) {
+ if (hadoopMapper == null) {
throw new NullPointerException("Mapper may not be null.");
}
- if(conf == null) {
+ if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
-
+
this.mapper = hadoopMapper;
this.jobConf = conf;
}
@@ -88,13 +89,13 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
-
+
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
@Override
- public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
@@ -102,15 +103,15 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
@SuppressWarnings("unchecked")
@Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
-
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
- return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+ return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
-
+
/**
* Custom serialization methods.
* @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
@@ -122,10 +123,10 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
- Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass =
- (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+ Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
+ (Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
-
+
jobConf = new JobConf();
jobConf.readFields(in);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index c1acc2b..fd0d37d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -18,81 +18,82 @@
package org.apache.flink.hadoopcompatibility.mapred;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
/**
* This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
@Public
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
- implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
- ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+ implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
+ ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
- private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
- private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+ private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
+ private transient Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> combiner;
private transient JobConf jobConf;
-
+
private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
- private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+ private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
+ private transient HadoopOutputCollector<KEYIN, VALUEIN> combineCollector;
private transient Reporter reporter;
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
- *
+ *
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
- Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+ Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner) {
this(hadoopReducer, hadoopCombiner, new JobConf());
}
-
+
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
- *
+ *
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
- Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
- if(hadoopReducer == null) {
+ Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) {
+ if (hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
- if(hadoopCombiner == null) {
+ if (hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
- if(conf == null) {
+ if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
-
+
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
@@ -104,7 +105,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
super.open(parameters);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);
-
+
this.reporter = new HadoopDummyReporter();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
@@ -114,7 +115,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
}
@Override
- public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ public void reduce(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
reduceCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
@@ -122,7 +123,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
}
@Override
- public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+ public void combine(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception {
combineCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
@@ -130,9 +131,9 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
@SuppressWarnings("unchecked")
@Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
@@ -144,7 +145,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
-
+
out.writeObject(reducer.getClass());
out.writeObject(combiner.getClass());
jobConf.write(out);
@@ -152,15 +153,15 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-
- Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+
+ Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
+ (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
-
- Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+
+ Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>> combinerClass =
+ (Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>>) in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
-
+
jobConf = new JobConf();
jobConf.readFields(in);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index 55aea24..fadd0b2 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -18,70 +18,71 @@
package org.apache.flink.hadoopcompatibility.mapred;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
/**
- * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
@Public
-public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
- implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+ implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
- private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+ private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
private transient JobConf jobConf;
-
+
private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+ private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
private transient Reporter reporter;
-
+
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- *
+ *
* @param hadoopReducer The Hadoop Reducer to wrap.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
this(hadoopReducer, new JobConf());
}
-
+
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- *
+ *
* @param hadoopReducer The Hadoop Reducer to wrap.
* @param conf The JobConf that is used to configure the Hadoop Reducer.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
- if(hadoopReducer == null) {
+ if (hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
- if(conf == null) {
+ if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
-
+
this.reducer = hadoopReducer;
this.jobConf = conf;
}
@@ -91,7 +92,7 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
-
+
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
@@ -100,9 +101,9 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
}
@Override
- public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ public void reduce(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
-
+
reduceCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
@@ -110,32 +111,32 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
@SuppressWarnings("unchecked")
@Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
- return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+ return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
- * Custom serialization methods
+ * Custom serialization methods.
* @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
-
+
out.writeObject(reducer.getClass());
- jobConf.write(out);
+ jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-
- Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+
+ Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
+ (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
-
+
jobConf = new JobConf();
jobConf.readFields(in);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
index bfe03d3..ff9e686 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+
import org.apache.hadoop.mapred.OutputCollector;
import java.io.IOException;
@@ -28,24 +29,24 @@ import java.io.IOException;
* A Hadoop OutputCollector that wraps a Flink OutputCollector.
* On each call of collect() the data is forwarded to the wrapped Flink collector.
*/
-public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
+public final class HadoopOutputCollector<KEY, VALUE> implements OutputCollector<KEY, VALUE> {
- private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+ private Collector<Tuple2<KEY, VALUE>> flinkCollector;
- private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+ private final Tuple2<KEY, VALUE> outTuple = new Tuple2<KEY, VALUE>();
/**
* Set the wrapped Flink collector.
- *
+ *
* @param flinkCollector The wrapped Flink OutputCollector.
*/
public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
this.flinkCollector = flinkCollector;
}
-
+
/**
- * Use the wrapped Flink collector to collect a key-value pair for Flink.
- *
+ * Use the wrapped Flink collector to collect a key-value pair for Flink.
+ *
* @param key the key to collect
* @param val the value to collect
* @throws IOException unexpected of key or value in key-value pair.
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
index 2d204b8..c58b5df 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -18,26 +18,26 @@
package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-import java.util.Iterator;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.Iterator;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
*/
-public class HadoopTupleUnwrappingIterator<KEY,VALUE>
+public class HadoopTupleUnwrappingIterator<KEY, VALUE>
extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private final TypeSerializer<KEY> keySerializer;
- private transient Iterator<Tuple2<KEY,VALUE>> iterator;
-
+ private transient Iterator<Tuple2<KEY, VALUE>> iterator;
+
private transient KEY curKey;
private transient VALUE firstValue;
private transient boolean atFirst;
@@ -45,16 +45,16 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE>
public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
this.keySerializer = checkNotNull(keySerializer);
}
-
+
/**
* Set the Flink iterator to wrap.
- *
+ *
* @param iterator The Flink iterator to wrap.
*/
@Override
- public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+ public void set(final Iterator<Tuple2<KEY, VALUE>> iterator) {
this.iterator = iterator;
- if(this.hasNext()) {
+ if (this.hasNext()) {
final Tuple2<KEY, VALUE> tuple = iterator.next();
this.curKey = keySerializer.copy(tuple.f0);
this.firstValue = tuple.f1;
@@ -63,30 +63,30 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE>
this.atFirst = false;
}
}
-
+
@Override
public boolean hasNext() {
- if(this.atFirst) {
+ if (this.atFirst) {
return true;
}
return iterator.hasNext();
}
-
+
@Override
public VALUE next() {
- if(this.atFirst) {
+ if (this.atFirst) {
this.atFirst = false;
return firstValue;
}
-
+
final Tuple2<KEY, VALUE> tuple = iterator.next();
return tuple.f1;
}
-
+
public KEY getCurrentKey() {
return this.curKey;
}
-
+
@Override
public void remove() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
index 133a5f4..a59af64 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.hadoopcompatibility.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.hadoop.mapreduce
-import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
@@ -81,7 +81,7 @@ object HadoopInputs {
key,
value,
inputPath
- )
+ )
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
index 2aefd9f..1fb3407 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
-
import org.junit.Test;
import java.io.DataInput;
@@ -33,8 +32,14 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for the type extraction of {@link Writable}.
+ */
@SuppressWarnings("serial")
public class WritableExtractionTest {
@@ -64,7 +69,7 @@ public class WritableExtractionTest {
TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
- TypeInformation<ViaAbstractClassExtension> info3 =
+ TypeInformation<ViaAbstractClassExtension> info3 =
TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
}
@@ -110,7 +115,7 @@ public class WritableExtractionTest {
}
};
- TypeInformation<DirectWritable> outType =
+ TypeInformation<DirectWritable> outType =
TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
assertTrue(outType instanceof WritableTypeInfo);
@@ -119,14 +124,14 @@ public class WritableExtractionTest {
@Test
public void testExtractAsPartOfPojo() {
- PojoTypeInfo<PojoWithWritable> pojoInfo =
+ PojoTypeInfo<PojoWithWritable> pojoInfo =
(PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
boolean foundWritable = false;
for (int i = 0; i < pojoInfo.getArity(); i++) {
PojoField field = pojoInfo.getPojoFieldAt(i);
String name = field.getField().getName();
-
+
if (name.equals("hadoopCitizen")) {
if (foundWritable) {
fail("already seen");
@@ -134,10 +139,10 @@ public class WritableExtractionTest {
foundWritable = true;
assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
-
+
}
}
-
+
assertTrue("missed the writable type", foundWritable);
}
@@ -152,9 +157,9 @@ public class WritableExtractionTest {
};
@SuppressWarnings("unchecked")
- TypeInformation<Writable> inType =
+ TypeInformation<Writable> inType =
(TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
-
+
try {
TypeExtractor.getMapReturnTypes(function, inType);
fail("exception expected");
@@ -168,11 +173,11 @@ public class WritableExtractionTest {
// test type classes
// ------------------------------------------------------------------------
- public interface ExtendedWritable extends Writable {}
+ private interface ExtendedWritable extends Writable {}
- public static abstract class AbstractWritable implements Writable {}
+ private abstract static class AbstractWritable implements Writable {}
- public static class DirectWritable implements Writable {
+ private static class DirectWritable implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@@ -181,7 +186,7 @@ public class WritableExtractionTest {
public void readFields(DataInput dataInput) throws IOException {}
}
- public static class ViaInterfaceExtension implements ExtendedWritable {
+ private static class ViaInterfaceExtension implements ExtendedWritable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@@ -190,7 +195,7 @@ public class WritableExtractionTest {
public void readFields(DataInput dataInput) throws IOException {}
}
- public static class ViaAbstractClassExtension extends AbstractWritable {
+ private static class ViaAbstractClassExtension extends AbstractWritable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@@ -199,6 +204,9 @@ public class WritableExtractionTest {
public void readFields(DataInput dataInput) throws IOException {}
}
+ /**
+ * Test Pojo containing a {@link DirectWritable}.
+ */
public static class PojoWithWritable {
public String str;
public DirectWritable hadoopCitizen;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
index 3d2b652..7262bb7 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
@@ -31,6 +31,9 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+/**
+ * Tests for the type information parsing of {@link Writable}.
+ */
public class WritableInfoParserTest {
@Test
@@ -66,7 +69,7 @@ public class WritableInfoParserTest {
// Test types
// ------------------------------------------------------------------------
- public static class MyWritable implements Writable {
+ private static class MyWritable implements Writable {
@Override
public void write(DataOutput out) throws IOException {}
@@ -75,6 +78,9 @@ public class WritableInfoParserTest {
public void readFields(DataInput in) throws IOException {}
}
+ /**
+ * Test Pojo containing a {@link Writable}.
+ */
public static class MyPojo {
public Integer basic;
public Tuple2<String, Integer> tuple;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
index 666ab84..903c856 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -18,11 +18,13 @@
package org.apache.flink.api.java.typeutils;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+import org.apache.hadoop.io.Writable;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
-import org.apache.hadoop.io.Writable;
/**
* Test for {@link WritableTypeInfo}.
@@ -41,7 +43,7 @@ public class WritableTypeInfoTest extends TypeInformationTestBase<WritableTypeIn
// test types
// ------------------------------------------------------------------------
- public static class TestClass implements Writable {
+ private static class TestClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@@ -50,7 +52,7 @@ public class WritableTypeInfoTest extends TypeInformationTestBase<WritableTypeIn
public void readFields(DataInput dataInput) throws IOException {}
}
- public static class AlternateClass implements Writable {
+ private static class AlternateClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index 8c3a8cd..6101c0a 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -19,64 +19,68 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+/**
+ * A {@link Writable} and {@link Comparable} wrapper for a string array.
+ */
public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> {
-
+
private String[] array = new String[0];
-
+
public StringArrayWritable() {
super();
}
-
+
public StringArrayWritable(String[] array) {
this.array = array;
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.array.length);
-
- for(String str : this.array) {
+
+ for (String str : this.array) {
byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(b.length);
out.write(b);
}
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
this.array = new String[in.readInt()];
-
- for(int i = 0; i < this.array.length; i++) {
+
+ for (int i = 0; i < this.array.length; i++) {
byte[] b = new byte[in.readInt()];
in.readFully(b);
this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
}
}
-
+
@Override
public int compareTo(StringArrayWritable o) {
- if(this.array.length != o.array.length) {
+ if (this.array.length != o.array.length) {
return this.array.length - o.array.length;
}
-
- for(int i = 0; i < this.array.length; i++) {
+
+ for (int i = 0; i < this.array.length; i++) {
int comp = this.array[i].compareTo(o.array[i]);
- if(comp != 0) {
+ if (comp != 0) {
return comp;
}
}
return 0;
}
-
+
@Override
public boolean equals(Object obj) {
- if(!(obj instanceof StringArrayWritable)) {
+ if (!(obj instanceof StringArrayWritable)) {
return false;
}
return this.compareTo((StringArrayWritable) obj) == 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
index 96f844c..104f754 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -22,30 +22,33 @@ import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+/**
+ * Tests for the {@link WritableComparator}.
+ */
public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
-
+
StringArrayWritable[] data = new StringArrayWritable[]{
new StringArrayWritable(new String[]{}),
new StringArrayWritable(new String[]{""}),
- new StringArrayWritable(new String[]{"a","a"}),
- new StringArrayWritable(new String[]{"a","b"}),
- new StringArrayWritable(new String[]{"c","c"}),
- new StringArrayWritable(new String[]{"d","f"}),
- new StringArrayWritable(new String[]{"d","m"}),
- new StringArrayWritable(new String[]{"z","x"}),
- new StringArrayWritable(new String[]{"a","a", "a"})
+ new StringArrayWritable(new String[]{"a", "a"}),
+ new StringArrayWritable(new String[]{"a", "b"}),
+ new StringArrayWritable(new String[]{"c", "c"}),
+ new StringArrayWritable(new String[]{"d", "f"}),
+ new StringArrayWritable(new String[]{"d", "m"}),
+ new StringArrayWritable(new String[]{"z", "x"}),
+ new StringArrayWritable(new String[]{"a", "a", "a"})
};
-
+
@Override
protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
}
-
+
@Override
protected TypeSerializer<StringArrayWritable> createSerializer() {
return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
}
-
+
@Override
protected StringArrayWritable[] getSortedTestData() {
return data;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
index 94e759d..f8d86de 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
+/**
+ * Tests for the {@link WritableComparator} with {@link WritableID}.
+ */
public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
@Override
protected TypeComparator<WritableID> createComparator(boolean ascending) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
index 4274cf6..47ddf42 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
@@ -25,6 +26,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.UUID;
+/**
+ * Test object that is both {@link Comparable} and {@link Writable}.
+ */
public class WritableID implements WritableComparable<WritableID> {
private UUID uuid;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
index bb5f4d4..9779c17 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -22,29 +22,33 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+
import org.junit.Test;
+/**
+ * Tests for the {@link WritableSerializer}.
+ */
public class WritableSerializerTest {
-
+
@Test
public void testStringArrayWritable() {
StringArrayWritable[] data = new StringArrayWritable[]{
new StringArrayWritable(new String[]{}),
new StringArrayWritable(new String[]{""}),
- new StringArrayWritable(new String[]{"a","a"}),
- new StringArrayWritable(new String[]{"a","b"}),
- new StringArrayWritable(new String[]{"c","c"}),
- new StringArrayWritable(new String[]{"d","f"}),
- new StringArrayWritable(new String[]{"d","m"}),
- new StringArrayWritable(new String[]{"z","x"}),
- new StringArrayWritable(new String[]{"a","a", "a"})
+ new StringArrayWritable(new String[]{"a", "a"}),
+ new StringArrayWritable(new String[]{"a", "b"}),
+ new StringArrayWritable(new String[]{"c", "c"}),
+ new StringArrayWritable(new String[]{"d", "f"}),
+ new StringArrayWritable(new String[]{"d", "m"}),
+ new StringArrayWritable(new String[]{"z", "x"}),
+ new StringArrayWritable(new String[]{"a", "a", "a"})
};
-
+
WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
-
- SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
-
+
+ SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer, writableTypeInfo.getTypeClass(), -1, data);
+
testInstance.testAll();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
index 2af7730..dca043d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
+/**
+ * Tests for the {@link WritableSerializer} with {@link WritableID}.
+ */
public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
@Override
protected TypeSerializer<WritableID> createSerializer() {
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
index 6f7673b..3bda1e6 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -20,10 +20,14 @@ package org.apache.flink.hadoopcompatibility;
import org.apache.flink.api.java.utils.AbstractParameterToolTest;
import org.apache.flink.api.java.utils.ParameterTool;
+
import org.junit.Test;
import java.io.IOException;
+/**
+ * Tests for the {@link HadoopUtils}.
+ */
public class HadoopUtilsTest extends AbstractParameterToolTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
index 4d1acb4..2fb2f88 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -18,14 +18,13 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
-import java.io.IOException;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +37,11 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
+
+/**
+ * IT cases for the {@link HadoopMapFunction}.
+ */
@RunWith(Parameterized.class)
public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
@@ -124,53 +128,60 @@ public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
compareResultsByLinesInMemory(expected, resultPath);
}
-
-
+ /**
+ * {@link Mapper} that only forwards records containing "bananas".
+ */
public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-
+
@Override
- public void map(final IntWritable k, final Text v,
+ public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
- if ( v.toString().contains("bananas") ) {
- out.collect(k,v);
+ if (v.toString().contains("bananas")) {
+ out.collect(k, v);
}
}
-
+
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * {@link Mapper} that duplicates records.
+ */
public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-
+
@Override
- public void map(final IntWritable k, final Text v,
+ public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
out.collect(k, v);
out.collect(k, new Text(v.toString().toUpperCase()));
}
-
+
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * {@link Mapper} that filters records based on a prefix.
+ */
public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
private String filterPrefix;
-
+
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
throws IOException {
- if(v.toString().startsWith(filterPrefix)) {
+ if (v.toString().startsWith(filterPrefix)) {
out.collect(k, v);
}
}
-
+
@Override
public void configure(JobConf c) {
filterPrefix = c.get("my.filterPrefix");
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 0b5a366..145eaaa 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -22,11 +22,15 @@ import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredComp
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.OperatingSystem;
+
import org.junit.Assume;
import org.junit.Before;
+/**
+ * IT cases for mapred.
+ */
public class HadoopMapredITCase extends JavaProgramTestBase {
-
+
protected String textPath;
protected String resultPath;
@@ -47,7 +51,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
}
-
+
@Override
protected void testProgram() throws Exception {
HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });