You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/14 23:26:33 UTC

[4/4] incubator-flink git commit: [FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support

[FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support

This closes #252


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/13968cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/13968cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/13968cd4

Branch: refs/heads/master
Commit: 13968cd4de446b4f565a094554380eb8559b6cf9
Parents: de7f478
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Dec 5 19:19:29 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:38:32 2014 +0100

----------------------------------------------------------------------
 .../mapred/HadoopIOFormatsITCase.java           | 230 +++++++++++++++++++
 flink-java/pom.xml                              |  35 +++
 .../typeutils/runtime/WritableSerializer.java   |   4 +
 .../java/org/apache/hadoop/io/Writable.java     | 105 ---------
 4 files changed, 269 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..6ef0f2e
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String[] resultPath;
+	private String[] expectedResult;
+	private String sequenceFileInPath;
+	private String sequenceFileInPathNull;
+
+	public HadoopIOFormatsITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+		File sequenceFile = createAndRegisterTempFile("seqFile");
+		sequenceFileInPath = sequenceFile.toURI().toString();
+
+		// Create a sequence file
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+		Path path = new Path(sequenceFile.getAbsolutePath());
+
+		//  ------------------ Long / Text Key Value pair: ------------
+		int kvCount = 4;
+
+		LongWritable key = new LongWritable();
+		Text value = new Text();
+		SequenceFile.Writer writer = null;
+		try {
+			writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				if(i == 1) {
+					// write key = 0 a bit more often.
+					for(int a = 0;a < 15; a++) {
+						key.set(i);
+						value.set(i+" - somestring");
+						writer.append(key, value);
+					}
+				}
+				key.set(i);
+				value.set(i+" - somestring");
+				writer.append(key, value);
+			}
+		} finally {
+			IOUtils.closeStream(writer);
+		}
+
+
+		//  ------------------ Long / Text Key Value pair: ------------
+
+		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+		path = new Path(sequenceFileInPathNull);
+
+		LongWritable value1 = new LongWritable();
+		SequenceFile.Writer writer1 = null;
+		try {
+			writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				value1.set(i);
+				writer1.append(NullWritable.get(), value1);
+			}
+		} finally {
+			IOUtils.closeStream(writer1);
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		for(int i = 0; i < resultPath.length; i++) {
+			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+		}
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class HadoopIOFormatPrograms {
+		
+		public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/**
+				 * Test sequence file, including a key access.
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
+				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
+				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
+				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+					@Override
+					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
+						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
+					}
+				}).sum(0);
+				sumed.writeAsText(resultPath[0]);
+				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+					@Override
+					public String map(Tuple2<LongWritable, Text> value) throws Exception {
+						return value.f1 + " - " + value.f0.get();
+					}
+				});
+				res.writeAsText(resultPath[1]);
+				env.execute();
+				
+				// return expected result
+				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
+						"1 - somestring - 1\n" +
+						"2 - somestring - 2\n" +
+						"3 - somestring - 3\n"};
+
+			}
+			case 2: {
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
+				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
+				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
+				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+					@Override
+					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+						return new Tuple2<Void, Long>(null, value.f1.get());
+					}
+				});
+				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
+				res1.writeAsText(resultPath[1]);
+				res.writeAsText(resultPath[0]);
+				env.execute();
+
+				// return expected result
+				return 	new String [] {"(null,2)\n" +
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,3)",
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,2)\n" +
+						"(null,3)"};
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 4ae2cc3..21a0b68 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -86,4 +86,39 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
+
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<profile>
+			<id>hadoop-1</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+				</property>
+			</activation>
+			<dependencies>
+				<!-- "Old" Hadoop = MapReduce v1 -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 0fe8fdf..e838d27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -44,6 +45,9 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	
 	@Override
 	public T createInstance() {
+		if(typeClass == NullWritable.class) {
+			return (T) NullWritable.get();
+		}
 		return InstantiationUtil.instantiate(typeClass);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java b/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
deleted file mode 100644
index 16efe7f..0000000
--- a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// ================================================================================================
-//                                 !!! NOTICE !!!
-// 
-//  This interface has been directly copied from the Apache Hadoop project.
-//  It has been added to this project to allow compiling against the type "Writable"
-//  without adding the heavyweight Hadoop dependency. This keeps the project dependencies 
-//  lightweight.
-//
-//  At runtime, the JVM will load either this interface, or the interface from a Hadoop jar,
-//  if present. In both cases, the dynamic class loading, linking, and method lookup will
-//  allow the types to interoperate as long as package name, class name, and method signature
-//  of this interface are kept strictly in sync with the version packaged with Hadoop.
-//
-//  This is a core interface of the Hadoop project and has been stable across all releases.
-//
-// ================================================================================================
-
-package org.apache.hadoop.io;
-
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-
-
-/**
- * A serializable object which implements a simple, efficient, serialization 
- * protocol, based on {@link DataInput} and {@link DataOutput}.
- *
- * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
- * framework implements this interface.</p>
- * 
- * <p>Implementations typically implement a static <code>read(DataInput)</code>
- * method which constructs a new instance, calls {@link #readFields(DataInput)} 
- * and returns the instance.</p>
- * 
- * <p>Example:</p>
- * <p><blockquote><pre>
- *     public class MyWritable implements Writable {
- *       // Some data
- *       private int counter;
- *       private long timestamp;
- *
- *       // Default constructor to allow (de)serialization
- *       MyWritable() { }
- *
- *       public void write(DataOutput out) throws IOException {
- *         out.writeInt(counter);
- *         out.writeLong(timestamp);
- *       }
- *
- *       public void readFields(DataInput in) throws IOException {
- *         counter = in.readInt();
- *         timestamp = in.readLong();
- *       }
- *
- *       public static MyWritable read(DataInput in) throws IOException {
- *         MyWritable w = new MyWritable();
- *         w.readFields(in);
- *         return w;
- *       }
- *     }
- * </pre></blockquote></p>
- */
-public interface Writable {
-	/**
-	 * Serialize the fields of this object to <code>out</code>.
-	 * 
-	 * @param out
-	 *            <code>DataOuput</code> to serialize this object into.
-	 * @throws IOException
-	 */
-	void write(DataOutput out) throws IOException;
-
-	/**
-	 * Deserialize the fields of this object from <code>in</code>.
-	 * 
-	 * <p>
-	 * For efficiency, implementations should attempt to re-use storage in the
-	 * existing object where possible.
-	 * </p>
-	 * 
-	 * @param in
-	 *            <code>DataInput</code> to deseriablize this object from.
-	 * @throws IOException
-	 */
-	void readFields(DataInput in) throws IOException;
-}