You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/26 18:51:25 UTC

[1/3] incubator-flink git commit: [FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful error messages

Repository: incubator-flink
Updated Branches:
  refs/heads/master 82f5154a9 -> 112b3a937


[FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful error messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8589303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8589303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8589303

Branch: refs/heads/master
Commit: d85893036b9a3122900010ce975feba43b27531d
Parents: 82f5154
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:06:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:58 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 24 +++++++++-
 .../operators/util/UserCodeClassWrapper.java    |  5 ++
 .../operators/util/UserCodeObjectWrapper.java   |  5 ++
 .../common/operators/util/UserCodeWrapper.java  | 24 ++++++----
 .../api/java/io/CollectionInputFormat.java      | 28 ++++++++----
 .../api/java/io/CollectionInputFormatTest.java  |  5 +-
 .../runtime/jobgraph/InputFormatVertex.java     | 45 ++++++++++++++----
 .../runtime/jobgraph/OutputFormatVertex.java    | 48 ++++++++++++++------
 8 files changed, 137 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index eb6fe2e..bb537b7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.TempMode;
@@ -835,6 +836,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSourceTask.class);
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
 
 		// set user code
 		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
@@ -850,7 +852,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 		vertex.setInvokableClass(DataSinkTask.class);
 		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
-
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
+		
 		// set user code
 		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
 		config.setStubParameters(node.getPactContract().getParameters());
@@ -1426,6 +1429,25 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
 		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
 	}
+	
+	private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+		try {
+			if (wrapper.hasObject()) {
+				try {
+					return wrapper.getUserCodeObject().toString();
+				}
+				catch (Throwable t) {
+					return wrapper.getUserCodeClass().getName();
+				}
+			}
+			else {
+				return wrapper.getUserCodeClass().getName();
+			}
+		}
+		catch (Throwable t) {
+			return null;
+		}
+	}
 
 	// -------------------------------------------------------------------------------------
 	// Descriptors for tasks / configurations that are chained or merged with other tasks

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
index 61fc382..1b1d5ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
@@ -53,4 +53,9 @@ public class UserCodeClassWrapper<T> implements UserCodeWrapper<T> {
 	public Class<? extends T> getUserCodeClass() {
 		return userCodeClass;
 	}
+	
+	@Override
+	public boolean hasObject() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 95a425e..f06de90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -121,4 +121,9 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T> {
 	public Class<? extends T> getUserCodeClass() {
 		return (Class<? extends T>) userCodeObject.getClass();
 	}
+	
+	@Override
+	public boolean hasObject() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
index 3d13041..032c906 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
@@ -22,29 +22,26 @@ import java.io.Serializable;
 import java.lang.annotation.Annotation;
 
 /**
- * PACT contracts can have either a class or an object containing the user
+ * UDf operators can have either a class or an object containing the user
  * code, this is the common interface to access them.
  */
 public interface UserCodeWrapper<T> extends Serializable {
+	
 	/**
-	 * Gets the user code object. In the case of a pact, that object will be the stub with the user function,
-	 * in the case of an input or output format, it will be the format object.
-	 * 
+	 * Gets the user code object, which may be either a function or an input or output format.
 	 * The subclass is supposed to just return the user code object or instantiate the class.
 	 * 
 	 * @return The class with the user code.
 	 */
-	public T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
+	T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
 	
 	/**
 	 * Gets the user code object. In the case of a pact, that object will be the stub with the user function,
 	 * in the case of an input or output format, it will be the format object.
 	 * 
-	 * The subclass is supposed to just return the user code object or instantiate the class.
-	 * 
 	 * @return The class with the user code.
 	 */
-	public T getUserCodeObject();
+	T getUserCodeObject();
 	
 	/**
 	 * Gets an annotation that pertains to the user code class. By default, this method will look for
@@ -55,7 +52,7 @@ public interface UserCodeWrapper<T> extends Serializable {
 	 *        the Class object corresponding to the annotation type
 	 * @return the annotation, or null if no annotation of the requested type was found
 	 */
-	public <A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
+	<A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
 	
 	/**
 	 * Gets the class of the user code. If the user code is provided as a class, this class is just returned.
@@ -63,5 +60,12 @@ public interface UserCodeWrapper<T> extends Serializable {
 	 * 
 	 * @return The class of the user code object.
 	 */
-	public Class<? extends T> getUserCodeClass ();
+	Class<? extends T> getUserCodeClass ();
+	
+	/**
+	 * Checks whether the wrapper already has an object, or whether it needs to instantiate it.
+	 * 
+	 * @return True, if the wrapper has already an object, false if it has only a class.
+	 */
+	boolean hasObject();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index b999ede..97e8715 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -78,11 +78,15 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.defaultWriteObject();
-		out.writeInt(dataSet.size());
 		
-		OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
-		for (T element : dataSet){
-			serializer.serialize(element, wrapper);
+		final int size = dataSet.size();
+		out.writeInt(size);
+		
+		if (size > 0) {
+			OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
+			for (T element : dataSet){
+				serializer.serialize(element, wrapper);
+			}
 		}
 	}
 
@@ -92,11 +96,17 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
 		
-		InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
-		for (int i = 0; i < collectionLength; i++){
-			T element = serializer.createInstance();
-			element = serializer.deserialize(element, wrapper);
-			list.add(element);
+		if (collectionLength > 0) {
+			try {
+				InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
+				for (int i = 0; i < collectionLength; i++){
+					T element = serializer.deserialize(wrapper);
+					list.add(element);
+				}
+			}
+			catch (Throwable t) {
+				throw new IOException("Error while deserializing element from collection", t);
+			}
 		}
 
 		dataSet = list;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 948d22f..64dae22 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -246,11 +246,8 @@ public class CollectionInputFormatTest {
 				in.readObject();
 				fail("should throw an exception");
 			}
-			catch (TestException e) {
-				// expected
-			}
 			catch (Exception e) {
-				fail("Exception not properly forwarded");
+				assertTrue(e.getCause() instanceof TestException);
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 8ee4da4..0ea0da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -26,8 +26,7 @@ public class InputFormatVertex extends AbstractJobVertex {
 
 	private static final long serialVersionUID = 1L;
 	
-	/** Caches the output format associated to this output vertex. */
-	private transient InputFormat<?, ?> inputFormat;
+	private String formatDescription;
 	
 	
 	public InputFormatVertex(String name) {
@@ -39,19 +38,47 @@ public class InputFormatVertex extends AbstractJobVertex {
 	}
 	
 	
+	public void setFormatDescription(String formatDescription) {
+		this.formatDescription = formatDescription;
+	}
+	
+	public String getFormatDescription() {
+		return formatDescription;
+	}
+	
 	@Override
 	public void initializeOnMaster(ClassLoader loader) throws Exception {
-		if (inputFormat == null) {
-			TaskConfig cfg = new TaskConfig(getConfiguration());
-			UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
-			
-			if (wrapper == null) {
-				throw new Exception("No input format present in InputFormatVertex's task configuration.");
-			}
+		final TaskConfig cfg = new TaskConfig(getConfiguration());
+		
+		// deserialize from the payload
+		UserCodeWrapper<InputFormat<?, ?>> wrapper;
+		try {
 			
+			wrapper = cfg.getStubWrapper(loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Deserializing the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		if (wrapper == null) {
+			throw new Exception("No input format present in InputFormatVertex's task configuration.");
+		}
+		
+		// instantiate, if necessary
+		InputFormat<?, ?> inputFormat;
+		try {
 			inputFormat = wrapper.getUserCodeObject(InputFormat.class, loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		
+		// configure
+		try {
 			inputFormat.configure(cfg.getStubParameters());
 		}
+		catch (Throwable t) {
+			throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
 		
 		setInputSplitSource(inputFormat);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 365ed92..708b390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -31,10 +31,8 @@ public class OutputFormatVertex extends AbstractJobVertex {
 	
 	private static final long serialVersionUID = 1L;
 	
+	private String formatDescription;
 	
-	/** Caches the output format associated to this output vertex. */
-	private transient OutputFormat<?> outputFormat;
-
 	/**
 	 * Creates a new task vertex with the specified name.
 	 * 
@@ -44,23 +42,45 @@ public class OutputFormatVertex extends AbstractJobVertex {
 		super(name);
 	}
 	
+	public void setFormatDescription(String formatDescription) {
+		this.formatDescription = formatDescription;
+	}
+	
+	public String getFormatDescription() {
+		return formatDescription;
+	}
 	
 	@Override
 	public void initializeOnMaster(ClassLoader loader) throws Exception {
-		if (this.outputFormat == null) {
-			TaskConfig cfg = new TaskConfig(getConfiguration());
-			UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		final TaskConfig cfg = new TaskConfig(getConfiguration());
 		
-			if (wrapper == null) {
-				throw new Exception("No output format present in OutputFormatVertex's task configuration.");
-			}
-
-			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
-			this.outputFormat.configure(cfg.getStubParameters());
+		UserCodeWrapper<OutputFormat<?>> wrapper;
+		try {
+			wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		if (wrapper == null) {
+			throw new Exception("No input format present in InputFormatVertex's task configuration.");
+		}
+		
+		OutputFormat<?> outputFormat;
+		try {
+			outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		try {
+			outputFormat.configure(cfg.getStubParameters());
+		}
+		catch (Throwable t) {
+			throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
 		}
 		
-		if (this.outputFormat instanceof InitializeOnMaster) {
-			((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
+		if (outputFormat instanceof InitializeOnMaster) {
+			((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
 		}
 	}
 }


[2/3] incubator-flink git commit: [FLINK-1278] [runtime] (part 1) Remove special code paths for the Record data type in the input readers and the source task.

Posted by se...@apache.org.
[FLINK-1278] [runtime] (part 1) Remove special code paths for the Record data type in the input readers and the source task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/48b6d01c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/48b6d01c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/48b6d01c

Branch: refs/heads/master
Commit: 48b6d01c2ec73b10bcb3d668dd0bac4dc715f524
Parents: d858930
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:18:55 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:59 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |   1 -
 .../flink/runtime/operators/DataSinkTask.java   |  22 +---
 .../flink/runtime/operators/DataSourceTask.java | 109 ++++++-------------
 .../runtime/operators/RegularPactTask.java      |  20 +---
 .../operators/util/RecordReaderIterator.java    |  57 ----------
 .../operators/chaining/ChainTaskTest.java       |   1 +
 .../operators/testutils/MockEnvironment.java    |  15 ++-
 7 files changed, 55 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index bb537b7..9c2efb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -851,7 +851,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSinkTask.class);
-		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
 		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
 		
 		// set user code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b1185c2..74c625f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -37,22 +37,17 @@ import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubExcepti
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.CloseableInputProvider;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * DataSinkTask which is executed by a Flink task manager.
- * The task hands the data to an output format.
+ * DataSinkTask which is executed by a task manager. The task hands the data to an output format.
  * 
  * @see OutputFormat
  */
 public class DataSinkTask<IT> extends AbstractInvokable {
 	
-	public static final String DEGREE_OF_PARALLELISM_KEY = "sink.dop";
-	
 	// Obtain DataSinkTask Logger
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
 
@@ -339,17 +334,10 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		
-		if (this.inputTypeSerializerFactory.getDataType() == Record.class) {
-			// record specific deserialization
-			MutableReader<Record> reader = (MutableReader<Record>) inputReader;
-			this.reader = (MutableObjectIterator<IT>)new RecordReaderIterator(reader);
-		} else {
-			// generic data type serialization
-			MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-			@SuppressWarnings({ "rawtypes" })
-			final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
-			this.reader = (MutableObjectIterator<IT>)iter;
-		}
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "rawtypes" })
+		final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
+		this.reader = (MutableObjectIterator<IT>)iter;
 		
 		// final sanity check
 		if (numGates != this.config.getNumInputs()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index bfd2507..2db652f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.shipping.OutputCollector;
-import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
- * DataSourceTask which is executed by a Nephele task manager. The task reads data and uses an 
+ * DataSourceTask which is executed by a task manager. The task reads data and uses an 
  * {@link InputFormat} to create records from the input.
  * 
  * @see org.apache.flink.api.common.io.InputFormat
@@ -140,80 +137,44 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 				}
 				
 				try {
-					// ======= special-case the Record, to help the JIT and avoid some casts ======
-					if (record.getClass() == Record.class) {
-						Record typedRecord = (Record) record;
-						@SuppressWarnings("unchecked")
-						final InputFormat<Record, InputSplit> inFormat = (InputFormat<Record, InputSplit>) format;
+					// special case to make the loops tighter
+					if (this.output instanceof OutputCollector) {
+						final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
 						
-						if (this.output instanceof RecordOutputCollector) {
-							// Record going directly into network channels
-							final RecordOutputCollector output = (RecordOutputCollector) this.output;
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								Record returnedRecord = null;
-								if ((returnedRecord = inFormat.nextRecord(typedRecord)) != null) {
-									output.collect(returnedRecord);
-								}
-							}
-						} else if (this.output instanceof ChainedCollectorMapDriver) {
-							// Record going to a chained map task
-							@SuppressWarnings("unchecked")
-							final ChainedCollectorMapDriver<Record, ?> output = (ChainedCollectorMapDriver<Record, ?>) this.output;
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
 							
-							// as long as there is data to read
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								if ((typedRecord = inFormat.nextRecord(typedRecord)) != null) {
-									// This is where map of UDF gets called
-									output.collect(typedRecord);
-								}
-							}
-						} else {
-							// Record going to some other chained task
-							@SuppressWarnings("unchecked")
-							final Collector<Record> output = (Collector<Record>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								if ((typedRecord = inFormat.nextRecord(typedRecord)) != null){
-									output.collect(typedRecord);
-								}
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
 						}
-					} else {
-						// general types. we make a case distinction here for the common cases, in order to help
-						// JIT method inlining
-						if (this.output instanceof OutputCollector) {
-							final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
-							}
-						} else if (this.output instanceof ChainedCollectorMapDriver) {
-							@SuppressWarnings("unchecked")
-							final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
+					}
+					else if (this.output instanceof ChainedCollectorMapDriver) {
+						@SuppressWarnings("unchecked")
+						final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
+						
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
+							
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
-						} else {
-							final Collector<OT> output = this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
+						}
+					}
+					else {
+						final Collector<OT> output = this.output;
+						
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
+							
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
 						}
 					}
@@ -279,7 +240,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	
 	/**
 	 * Initializes the InputFormat implementation and configuration.
-l	 * 
+	 * 
 	 * @throws RuntimeException
 	 *         Throws if instance of InputFormat implementation can not be
 	 *         obtained.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c1037b5..2a598ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.operators.util.CloseableInputProvider;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -1031,20 +1030,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 	
 	protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
-
-		if (serializerFactory.getDataType().equals(Record.class)) {
-			// record specific deserialization
-			@SuppressWarnings("unchecked")
-			MutableReader<Record> reader = (MutableReader<Record>) inputReader;
-			return new RecordReaderIterator(reader);
-		} else {
-			// generic data type serialization
-			@SuppressWarnings("unchecked")
-			MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-			@SuppressWarnings({ "unchecked", "rawtypes" })
-			final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
-			return iter;
-		}
+		@SuppressWarnings("unchecked")
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
+		return iter;
 	}
 
 	protected int getNumTaskInputs() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
deleted file mode 100644
index d95a42c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.util;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
-* A {@link MutableObjectIterator} that wraps a Nephele Reader producing {@link Record}s.
-*/
-public final class RecordReaderIterator implements MutableObjectIterator<Record> {
-	
-	private final MutableReader<Record> reader;		// the source
-
-	/**
-	 * Creates a new iterator, wrapping the given reader.
-	 * 
-	 * @param reader The reader to wrap.
-	 */
-	public RecordReaderIterator(MutableReader<Record> reader) {
-		this.reader = reader;
-	}
-
-	@Override
-	public Record next(Record reuse) throws IOException {
-		try {
-			if (this.reader.next(reuse)) {
-				return reuse;
-			} else {
-				return null;
-			}
-		}
-		catch (InterruptedException e) {
-			throw new IOException("Reader interrupted.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 4c08ebd..0e029a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -146,6 +146,7 @@ public class ChainTaskTest extends TaskTestBase {
 				// driver
 				combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 				combineConfig.setDriverComparator(compFact, 0);
+				combineConfig.setDriverComparator(compFact, 1);
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 60a81c1..9916b61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -69,7 +70,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	private final Configuration taskConfiguration;
 
-	private final List<InputGate<Record>> inputs;
+	private final List<InputGate<DeserializationDelegate<Record>>> inputs;
 
 	private final List<OutputGate> outputs;
 
@@ -83,7 +84,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.inputs = new LinkedList<InputGate<Record>>();
+		this.inputs = new LinkedList<InputGate<DeserializationDelegate<Record>>>();
 		this.outputs = new LinkedList<OutputGate>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
@@ -172,7 +173,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	}
 
-	private static class MockInputGate extends InputGate<Record> {
+	private static class MockInputGate extends InputGate<DeserializationDelegate<Record>> {
 		
 		private MutableObjectIterator<Record> it;
 
@@ -182,15 +183,17 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		}
 
 		@Override
-		public void registerRecordAvailabilityListener(final RecordAvailabilityListener<Record> listener) {
+		public void registerRecordAvailabilityListener(final RecordAvailabilityListener<DeserializationDelegate<Record>> listener) {
 			super.registerRecordAvailabilityListener(listener);
 			this.notifyRecordIsAvailable(0);
 		}
 		
 		@Override
-		public InputChannelResult readRecord(Record target) throws IOException, InterruptedException {
+		public InputChannelResult readRecord(DeserializationDelegate<Record> target) throws IOException, InterruptedException {
 
-			if ((target = it.next(target)) != null) {
+			Record reuse = target != null ? target.getInstance() : null;
+			
+			if ((reuse = it.next(reuse)) != null) {
 				// everything comes from the same source channel and buffer in this mock
 				notifyRecordIsAvailable(0);
 				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;


[3/3] incubator-flink git commit: [Scala API] Case Class serializer can work with classes that cannot be instantiated empty

Posted by se...@apache.org.
[Scala API] Case Class serializer can work with classes that cannot be instantiated empty


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/112b3a93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/112b3a93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/112b3a93

Branch: refs/heads/master
Commit: 112b3a937a1644982f24174163085c582b471f8c
Parents: 48b6d01
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:23:00 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:59 2014 +0100

----------------------------------------------------------------------
 .../scala/typeutils/CaseClassSerializer.scala   | 35 ++++++++++++--------
 1 file changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/112b3a93/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 61b876f..f82a67f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -33,15 +33,29 @@ abstract class CaseClassSerializer[T <: Product](
 
   @transient var fields : Array[AnyRef] = _
   
+  @transient var instanceCreationFailed : Boolean = false
   
   def createInstance: T = {
-    initArray()
-    var i = 0
-    while (i < arity) {
-      fields(i) = fieldSerializers(i).createInstance()
-      i += 1
+    if (instanceCreationFailed) {
+      null.asInstanceOf[T]
+    }
+    else {
+      initArray()
+      try {
+        var i = 0
+        while (i < arity) {
+          fields(i) = fieldSerializers(i).createInstance()
+          i += 1
+        }
+        createInstance(fields)
+      }
+      catch {
+        case t: Throwable => {
+          instanceCreationFailed = true
+          null.asInstanceOf[T]
+        }
+      }
     }
-    createInstance(fields)
   }
 
   def copy(from: T, reuse: T): T = {
@@ -68,14 +82,7 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def deserialize(reuse: T, source: DataInputView): T = {
-    initArray()
-    var i = 0
-    while (i < arity) {
-      val field = reuse.productElement(i).asInstanceOf[AnyRef]
-      fields(i) = fieldSerializers(i).deserialize(field, source)
-      i += 1
-    }
-    createInstance(fields)
+    deserialize(source);
   }
   
   def deserialize(source: DataInputView): T = {