You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/14 23:26:33 UTC
[4/4] incubator-flink git commit: [FLINK-1305] [FLINK-1304] Test for
HadoopInputWrapper and NullWritable support
[FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support
This closes #252
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/13968cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/13968cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/13968cd4
Branch: refs/heads/master
Commit: 13968cd4de446b4f565a094554380eb8559b6cf9
Parents: de7f478
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Dec 5 19:19:29 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:38:32 2014 +0100
----------------------------------------------------------------------
.../mapred/HadoopIOFormatsITCase.java | 230 +++++++++++++++++++
flink-java/pom.xml | 35 +++
.../typeutils/runtime/WritableSerializer.java | 4 +
.../java/org/apache/hadoop/io/Writable.java | 105 ---------
4 files changed, 269 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..6ef0f2e
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 2;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String[] resultPath;
+ private String[] expectedResult;
+ private String sequenceFileInPath;
+ private String sequenceFileInPathNull;
+
+ public HadoopIOFormatsITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+ File sequenceFile = createAndRegisterTempFile("seqFile");
+ sequenceFileInPath = sequenceFile.toURI().toString();
+
+ // Create a sequence file
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+ Path path = new Path(sequenceFile.getAbsolutePath());
+
+ // ------------------ Long / Text Key Value pair: ------------
+ int kvCount = 4;
+
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ SequenceFile.Writer writer = null;
+ try {
+ writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
+ for (int i = 0; i < kvCount; i ++) {
+ if(i == 1) {
+ // write key = 0 a bit more often.
+ for(int a = 0;a < 15; a++) {
+ key.set(i);
+ value.set(i+" - somestring");
+ writer.append(key, value);
+ }
+ }
+ key.set(i);
+ value.set(i+" - somestring");
+ writer.append(key, value);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+
+
+ // ------------------ Long / Text Key Value pair: ------------
+
+ File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+ sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+ path = new Path(sequenceFileInPathNull);
+
+ LongWritable value1 = new LongWritable();
+ SequenceFile.Writer writer1 = null;
+ try {
+ writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
+ for (int i = 0; i < kvCount; i ++) {
+ value1.set(i);
+ writer1.append(NullWritable.get(), value1);
+ }
+ } finally {
+ IOUtils.closeStream(writer1);
+ }
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for(int i = 0; i < resultPath.length; i++) {
+ compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+ }
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ public static class HadoopIOFormatPrograms {
+
+ public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /**
+ * Test sequence file, including a key access.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
+ JobConf hdconf = new JobConf();
+ SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
+ HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
+ DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
+ DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+ @Override
+ public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
+ return new Tuple2<Long, Text>(value.f0.get(), value.f1);
+ }
+ }).sum(0);
+ sumed.writeAsText(resultPath[0]);
+ DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+ @Override
+ public String map(Tuple2<LongWritable, Text> value) throws Exception {
+ return value.f1 + " - " + value.f0.get();
+ }
+ });
+ res.writeAsText(resultPath[1]);
+ env.execute();
+
+ // return expected result
+ return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
+ "1 - somestring - 1\n" +
+ "2 - somestring - 2\n" +
+ "3 - somestring - 3\n"};
+
+ }
+ case 2: {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+ JobConf hdconf = new JobConf();
+ SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
+ HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
+ DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
+ DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+ @Override
+ public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+ return new Tuple2<Void, Long>(null, value.f1.get());
+ }
+ });
+ DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
+ res1.writeAsText(resultPath[1]);
+ res.writeAsText(resultPath[0]);
+ env.execute();
+
+ // return expected result
+ return new String [] {"(null,2)\n" +
+ "(null,0)\n" +
+ "(null,1)\n" +
+ "(null,3)",
+ "(null,0)\n" +
+ "(null,1)\n" +
+ "(null,2)\n" +
+ "(null,3)"};
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 4ae2cc3..21a0b68 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -86,4 +86,39 @@ under the License.
</plugin>
</plugins>
</build>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- "Old" Hadoop = MapReduce v1 -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 0fe8fdf..e838d27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
@@ -44,6 +45,9 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
@Override
public T createInstance() {
+ if(typeClass == NullWritable.class) {
+ return (T) NullWritable.get();
+ }
return InstantiationUtil.instantiate(typeClass);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java b/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
deleted file mode 100644
index 16efe7f..0000000
--- a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// ================================================================================================
-// !!! NOTICE !!!
-//
-// This interface has been directly copied from the Apache Hadoop project.
-// It has been added to this project to allow compiling against the type "Writable"
-// without adding the heavyweight Hadoop dependency. This keeps the project dependencies
-// lightweight.
-//
-// At runtime, the JVM will load either this interface, or the interface from a Hadoop jar,
-// if present. In both cases, the dynamic class loading, linking, and method lookup will
-// allow the types to interoperate as long as package name, class name, and method signature
-// of this interface are kept strictly in sync with the version packaged with Hadoop.
-//
-// This is a core interface of the Hadoop project and has been stable across all releases.
-//
-// ================================================================================================
-
-package org.apache.hadoop.io;
-
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-
-
-/**
- * A serializable object which implements a simple, efficient, serialization
- * protocol, based on {@link DataInput} and {@link DataOutput}.
- *
- * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
- * framework implements this interface.</p>
- *
- * <p>Implementations typically implement a static <code>read(DataInput)</code>
- * method which constructs a new instance, calls {@link #readFields(DataInput)}
- * and returns the instance.</p>
- *
- * <p>Example:</p>
- * <p><blockquote><pre>
- * public class MyWritable implements Writable {
- * // Some data
- * private int counter;
- * private long timestamp;
- *
- * // Default constructor to allow (de)serialization
- * MyWritable() { }
- *
- * public void write(DataOutput out) throws IOException {
- * out.writeInt(counter);
- * out.writeLong(timestamp);
- * }
- *
- * public void readFields(DataInput in) throws IOException {
- * counter = in.readInt();
- * timestamp = in.readLong();
- * }
- *
- * public static MyWritable read(DataInput in) throws IOException {
- * MyWritable w = new MyWritable();
- * w.readFields(in);
- * return w;
- * }
- * }
- * </pre></blockquote></p>
- */
-public interface Writable {
- /**
- * Serialize the fields of this object to <code>out</code>.
- *
- * @param out
- * <code>DataOuput</code> to serialize this object into.
- * @throws IOException
- */
- void write(DataOutput out) throws IOException;
-
- /**
- * Deserialize the fields of this object from <code>in</code>.
- *
- * <p>
- * For efficiency, implementations should attempt to re-use storage in the
- * existing object where possible.
- * </p>
- *
- * @param in
- * <code>DataInput</code> to deseriablize this object from.
- * @throws IOException
- */
- void readFields(DataInput in) throws IOException;
-}