You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/26 18:51:25 UTC
[1/3] incubator-flink git commit: [FLINK-1286] [APIs] [runtime] Fix
serialization in CollectionInputFormat and generate meaningful error messages
Repository: incubator-flink
Updated Branches:
refs/heads/master 82f5154a9 -> 112b3a937
[FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful error messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8589303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8589303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8589303
Branch: refs/heads/master
Commit: d85893036b9a3122900010ce975feba43b27531d
Parents: 82f5154
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:06:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:58 2014 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 24 +++++++++-
.../operators/util/UserCodeClassWrapper.java | 5 ++
.../operators/util/UserCodeObjectWrapper.java | 5 ++
.../common/operators/util/UserCodeWrapper.java | 24 ++++++----
.../api/java/io/CollectionInputFormat.java | 28 ++++++++----
.../api/java/io/CollectionInputFormatTest.java | 5 +-
.../runtime/jobgraph/InputFormatVertex.java | 45 ++++++++++++++----
.../runtime/jobgraph/OutputFormatVertex.java | 48 ++++++++++++++------
8 files changed, 137 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index eb6fe2e..bb537b7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.dag.TempMode;
@@ -835,6 +836,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setInvokableClass(DataSourceTask.class);
+ vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
@@ -850,7 +852,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
vertex.setInvokableClass(DataSinkTask.class);
vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
-
+ vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
+
// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
config.setStubParameters(node.getPactContract().getParameters());
@@ -1426,6 +1429,25 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
}
+
+ private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+ try {
+ if (wrapper.hasObject()) {
+ try {
+ return wrapper.getUserCodeObject().toString();
+ }
+ catch (Throwable t) {
+ return wrapper.getUserCodeClass().getName();
+ }
+ }
+ else {
+ return wrapper.getUserCodeClass().getName();
+ }
+ }
+ catch (Throwable t) {
+ return null;
+ }
+ }
// -------------------------------------------------------------------------------------
// Descriptors for tasks / configurations that are chained or merged with other tasks
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
index 61fc382..1b1d5ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeClassWrapper.java
@@ -53,4 +53,9 @@ public class UserCodeClassWrapper<T> implements UserCodeWrapper<T> {
public Class<? extends T> getUserCodeClass() {
return userCodeClass;
}
+
+ @Override
+ public boolean hasObject() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 95a425e..f06de90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -121,4 +121,9 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T> {
public Class<? extends T> getUserCodeClass() {
return (Class<? extends T>) userCodeObject.getClass();
}
+
+ @Override
+ public boolean hasObject() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
index 3d13041..032c906 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeWrapper.java
@@ -22,29 +22,26 @@ import java.io.Serializable;
import java.lang.annotation.Annotation;
/**
- * PACT contracts can have either a class or an object containing the user
+ * UDf operators can have either a class or an object containing the user
* code, this is the common interface to access them.
*/
public interface UserCodeWrapper<T> extends Serializable {
+
/**
- * Gets the user code object. In the case of a pact, that object will be the stub with the user function,
- * in the case of an input or output format, it will be the format object.
- *
+ * Gets the user code object, which may be either a function or an input or output format.
* The subclass is supposed to just return the user code object or instantiate the class.
*
* @return The class with the user code.
*/
- public T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
+ T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
/**
* Gets the user code object. In the case of a pact, that object will be the stub with the user function,
* in the case of an input or output format, it will be the format object.
*
- * The subclass is supposed to just return the user code object or instantiate the class.
- *
* @return The class with the user code.
*/
- public T getUserCodeObject();
+ T getUserCodeObject();
/**
* Gets an annotation that pertains to the user code class. By default, this method will look for
@@ -55,7 +52,7 @@ public interface UserCodeWrapper<T> extends Serializable {
* the Class object corresponding to the annotation type
* @return the annotation, or null if no annotation of the requested type was found
*/
- public <A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
+ <A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
/**
* Gets the class of the user code. If the user code is provided as a class, this class is just returned.
@@ -63,5 +60,12 @@ public interface UserCodeWrapper<T> extends Serializable {
*
* @return The class of the user code object.
*/
- public Class<? extends T> getUserCodeClass ();
+ Class<? extends T> getUserCodeClass ();
+
+ /**
+ * Checks whether the wrapper already has an object, or whether it needs to instantiate it.
+ *
+ * @return True, if the wrapper has already an object, false if it has only a class.
+ */
+ boolean hasObject();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index b999ede..97e8715 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -78,11 +78,15 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
- out.writeInt(dataSet.size());
- OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
- for (T element : dataSet){
- serializer.serialize(element, wrapper);
+ final int size = dataSet.size();
+ out.writeInt(size);
+
+ if (size > 0) {
+ OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
+ for (T element : dataSet){
+ serializer.serialize(element, wrapper);
+ }
}
}
@@ -92,11 +96,17 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
- InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
- for (int i = 0; i < collectionLength; i++){
- T element = serializer.createInstance();
- element = serializer.deserialize(element, wrapper);
- list.add(element);
+ if (collectionLength > 0) {
+ try {
+ InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
+ for (int i = 0; i < collectionLength; i++){
+ T element = serializer.deserialize(wrapper);
+ list.add(element);
+ }
+ }
+ catch (Throwable t) {
+ throw new IOException("Error while deserializing element from collection", t);
+ }
}
dataSet = list;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 948d22f..64dae22 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -246,11 +246,8 @@ public class CollectionInputFormatTest {
in.readObject();
fail("should throw an exception");
}
- catch (TestException e) {
- // expected
- }
catch (Exception e) {
- fail("Exception not properly forwarded");
+ assertTrue(e.getCause() instanceof TestException);
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 8ee4da4..0ea0da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -26,8 +26,7 @@ public class InputFormatVertex extends AbstractJobVertex {
private static final long serialVersionUID = 1L;
- /** Caches the output format associated to this output vertex. */
- private transient InputFormat<?, ?> inputFormat;
+ private String formatDescription;
public InputFormatVertex(String name) {
@@ -39,19 +38,47 @@ public class InputFormatVertex extends AbstractJobVertex {
}
+ public void setFormatDescription(String formatDescription) {
+ this.formatDescription = formatDescription;
+ }
+
+ public String getFormatDescription() {
+ return formatDescription;
+ }
+
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
- if (inputFormat == null) {
- TaskConfig cfg = new TaskConfig(getConfiguration());
- UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
-
- if (wrapper == null) {
- throw new Exception("No input format present in InputFormatVertex's task configuration.");
- }
+ final TaskConfig cfg = new TaskConfig(getConfiguration());
+
+ // deserialize from the payload
+ UserCodeWrapper<InputFormat<?, ?>> wrapper;
+ try {
+ wrapper = cfg.getStubWrapper(loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Deserializing the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+ if (wrapper == null) {
+ throw new Exception("No input format present in InputFormatVertex's task configuration.");
+ }
+
+ // instantiate, if necessary
+ InputFormat<?, ?> inputFormat;
+ try {
inputFormat = wrapper.getUserCodeObject(InputFormat.class, loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+
+ // configure
+ try {
inputFormat.configure(cfg.getStubParameters());
}
+ catch (Throwable t) {
+ throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
setInputSplitSource(inputFormat);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8589303/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 365ed92..708b390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -31,10 +31,8 @@ public class OutputFormatVertex extends AbstractJobVertex {
private static final long serialVersionUID = 1L;
+ private String formatDescription;
- /** Caches the output format associated to this output vertex. */
- private transient OutputFormat<?> outputFormat;
-
/**
* Creates a new task vertex with the specified name.
*
@@ -44,23 +42,45 @@ public class OutputFormatVertex extends AbstractJobVertex {
super(name);
}
+ public void setFormatDescription(String formatDescription) {
+ this.formatDescription = formatDescription;
+ }
+
+ public String getFormatDescription() {
+ return formatDescription;
+ }
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
- if (this.outputFormat == null) {
- TaskConfig cfg = new TaskConfig(getConfiguration());
- UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+ final TaskConfig cfg = new TaskConfig(getConfiguration());
- if (wrapper == null) {
- throw new Exception("No output format present in OutputFormatVertex's task configuration.");
- }
-
- this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
- this.outputFormat.configure(cfg.getStubParameters());
+ UserCodeWrapper<OutputFormat<?>> wrapper;
+ try {
+ wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+ if (wrapper == null) {
+ throw new Exception("No input format present in InputFormatVertex's task configuration.");
+ }
+
+ OutputFormat<?> outputFormat;
+ try {
+ outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+ try {
+ outputFormat.configure(cfg.getStubParameters());
+ }
+ catch (Throwable t) {
+ throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
- if (this.outputFormat instanceof InitializeOnMaster) {
- ((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
+ if (outputFormat instanceof InitializeOnMaster) {
+ ((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
}
}
}
[2/3] incubator-flink git commit: [FLINK-1278] [runtime] (part 1)
Remove special code paths for the Record data type in the input readers and
the source task.
Posted by se...@apache.org.
[FLINK-1278] [runtime] (part 1) Remove special code paths for the Record data type in the input readers and the source task.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/48b6d01c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/48b6d01c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/48b6d01c
Branch: refs/heads/master
Commit: 48b6d01c2ec73b10bcb3d668dd0bac4dc715f524
Parents: d858930
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:18:55 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:59 2014 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 1 -
.../flink/runtime/operators/DataSinkTask.java | 22 +---
.../flink/runtime/operators/DataSourceTask.java | 109 ++++++-------------
.../runtime/operators/RegularPactTask.java | 20 +---
.../operators/util/RecordReaderIterator.java | 57 ----------
.../operators/chaining/ChainTaskTest.java | 1 +
.../operators/testutils/MockEnvironment.java | 15 ++-
7 files changed, 55 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index bb537b7..9c2efb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -851,7 +851,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setInvokableClass(DataSinkTask.class);
- vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
// set user code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b1185c2..74c625f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -37,22 +37,17 @@ import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubExcepti
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
/**
- * DataSinkTask which is executed by a Flink task manager.
- * The task hands the data to an output format.
+ * DataSinkTask which is executed by a task manager. The task hands the data to an output format.
*
* @see OutputFormat
*/
public class DataSinkTask<IT> extends AbstractInvokable {
- public static final String DEGREE_OF_PARALLELISM_KEY = "sink.dop";
-
// Obtain DataSinkTask Logger
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
@@ -339,17 +334,10 @@ public class DataSinkTask<IT> extends AbstractInvokable {
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
- if (this.inputTypeSerializerFactory.getDataType() == Record.class) {
- // record specific deserialization
- MutableReader<Record> reader = (MutableReader<Record>) inputReader;
- this.reader = (MutableObjectIterator<IT>)new RecordReaderIterator(reader);
- } else {
- // generic data type serialization
- MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
- @SuppressWarnings({ "rawtypes" })
- final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
- this.reader = (MutableObjectIterator<IT>)iter;
- }
+ MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+ @SuppressWarnings({ "rawtypes" })
+ final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
+ this.reader = (MutableObjectIterator<IT>)iter;
// final sanity check
if (numGates != this.config.getNumInputs()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index bfd2507..2db652f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.shipping.OutputCollector;
-import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
/**
- * DataSourceTask which is executed by a Nephele task manager. The task reads data and uses an
+ * DataSourceTask which is executed by a task manager. The task reads data and uses an
* {@link InputFormat} to create records from the input.
*
* @see org.apache.flink.api.common.io.InputFormat
@@ -140,80 +137,44 @@ public class DataSourceTask<OT> extends AbstractInvokable {
}
try {
- // ======= special-case the Record, to help the JIT and avoid some casts ======
- if (record.getClass() == Record.class) {
- Record typedRecord = (Record) record;
- @SuppressWarnings("unchecked")
- final InputFormat<Record, InputSplit> inFormat = (InputFormat<Record, InputSplit>) format;
+ // special case to make the loops tighter
+ if (this.output instanceof OutputCollector) {
+ final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
- if (this.output instanceof RecordOutputCollector) {
- // Record going directly into network channels
- final RecordOutputCollector output = (RecordOutputCollector) this.output;
- while (!this.taskCanceled && !inFormat.reachedEnd()) {
- // build next pair and ship pair if it is valid
- typedRecord.clear();
- Record returnedRecord = null;
- if ((returnedRecord = inFormat.nextRecord(typedRecord)) != null) {
- output.collect(returnedRecord);
- }
- }
- } else if (this.output instanceof ChainedCollectorMapDriver) {
- // Record going to a chained map task
- @SuppressWarnings("unchecked")
- final ChainedCollectorMapDriver<Record, ?> output = (ChainedCollectorMapDriver<Record, ?>) this.output;
+ // as long as there is data to read
+ while (!this.taskCanceled && !format.reachedEnd()) {
- // as long as there is data to read
- while (!this.taskCanceled && !inFormat.reachedEnd()) {
- // build next pair and ship pair if it is valid
- typedRecord.clear();
- if ((typedRecord = inFormat.nextRecord(typedRecord)) != null) {
- // This is where map of UDF gets called
- output.collect(typedRecord);
- }
- }
- } else {
- // Record going to some other chained task
- @SuppressWarnings("unchecked")
- final Collector<Record> output = (Collector<Record>) this.output;
- // as long as there is data to read
- while (!this.taskCanceled && !inFormat.reachedEnd()) {
- // build next pair and ship pair if it is valid
- typedRecord.clear();
- if ((typedRecord = inFormat.nextRecord(typedRecord)) != null){
- output.collect(typedRecord);
- }
+ OT returned;
+ if ((returned = format.nextRecord(record)) != null) {
+ output.collect(returned);
+ record = returned;
}
}
- } else {
- // general types. we make a case distinction here for the common cases, in order to help
- // JIT method inlining
- if (this.output instanceof OutputCollector) {
- final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
- // as long as there is data to read
- while (!this.taskCanceled && !format.reachedEnd()) {
- // build next pair and ship pair if it is valid
- if ((record = format.nextRecord(record)) != null) {
- output.collect(record);
- }
- }
- } else if (this.output instanceof ChainedCollectorMapDriver) {
- @SuppressWarnings("unchecked")
- final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
- // as long as there is data to read
- while (!this.taskCanceled && !format.reachedEnd()) {
- // build next pair and ship pair if it is valid
- if ((record = format.nextRecord(record)) != null) {
- output.collect(record);
- }
+ }
+ else if (this.output instanceof ChainedCollectorMapDriver) {
+ @SuppressWarnings("unchecked")
+ final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
+
+ // as long as there is data to read
+ while (!this.taskCanceled && !format.reachedEnd()) {
+
+ OT returned;
+ if ((returned = format.nextRecord(record)) != null) {
+ output.collect(returned);
+ record = returned;
}
- } else {
- final Collector<OT> output = this.output;
- // as long as there is data to read
- while (!this.taskCanceled && !format.reachedEnd()) {
- // build next pair and ship pair if it is valid
- if ((record = format.nextRecord(record)) != null) {
- output.collect(record);
- }
+ }
+ }
+ else {
+ final Collector<OT> output = this.output;
+
+ // as long as there is data to read
+ while (!this.taskCanceled && !format.reachedEnd()) {
+
+ OT returned;
+ if ((returned = format.nextRecord(record)) != null) {
+ output.collect(returned);
+ record = returned;
}
}
}
@@ -279,7 +240,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
/**
* Initializes the InputFormat implementation and configuration.
-l *
+ *
* @throws RuntimeException
* Throws if instance of InputFormat implementation can not be
* obtained.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c1037b5..2a598ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -1031,20 +1030,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
-
- if (serializerFactory.getDataType().equals(Record.class)) {
- // record specific deserialization
- @SuppressWarnings("unchecked")
- MutableReader<Record> reader = (MutableReader<Record>) inputReader;
- return new RecordReaderIterator(reader);
- } else {
- // generic data type serialization
- @SuppressWarnings("unchecked")
- MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
- return iter;
- }
+ @SuppressWarnings("unchecked")
+ MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
+ return iter;
}
protected int getNumTaskInputs() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
deleted file mode 100644
index d95a42c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.util;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
-* A {@link MutableObjectIterator} that wraps a Nephele Reader producing {@link Record}s.
-*/
-public final class RecordReaderIterator implements MutableObjectIterator<Record> {
-
- private final MutableReader<Record> reader; // the source
-
- /**
- * Creates a new iterator, wrapping the given reader.
- *
- * @param reader The reader to wrap.
- */
- public RecordReaderIterator(MutableReader<Record> reader) {
- this.reader = reader;
- }
-
- @Override
- public Record next(Record reuse) throws IOException {
- try {
- if (this.reader.next(reuse)) {
- return reuse;
- } else {
- return null;
- }
- }
- catch (InterruptedException e) {
- throw new IOException("Reader interrupted.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 4c08ebd..0e029a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -146,6 +146,7 @@ public class ChainTaskTest extends TaskTestBase {
// driver
combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
combineConfig.setDriverComparator(compFact, 0);
+ combineConfig.setDriverComparator(compFact, 1);
combineConfig.setRelativeMemoryDriver(memoryFraction);
// udf
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 60a81c1..9916b61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
@@ -69,7 +70,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
private final Configuration taskConfiguration;
- private final List<InputGate<Record>> inputs;
+ private final List<InputGate<DeserializationDelegate<Record>>> inputs;
private final List<OutputGate> outputs;
@@ -83,7 +84,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
- this.inputs = new LinkedList<InputGate<Record>>();
+ this.inputs = new LinkedList<InputGate<DeserializationDelegate<Record>>>();
this.outputs = new LinkedList<OutputGate>();
this.memManager = new DefaultMemoryManager(memorySize, 1);
@@ -172,7 +173,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
}
- private static class MockInputGate extends InputGate<Record> {
+ private static class MockInputGate extends InputGate<DeserializationDelegate<Record>> {
private MutableObjectIterator<Record> it;
@@ -182,15 +183,17 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
}
@Override
- public void registerRecordAvailabilityListener(final RecordAvailabilityListener<Record> listener) {
+ public void registerRecordAvailabilityListener(final RecordAvailabilityListener<DeserializationDelegate<Record>> listener) {
super.registerRecordAvailabilityListener(listener);
this.notifyRecordIsAvailable(0);
}
@Override
- public InputChannelResult readRecord(Record target) throws IOException, InterruptedException {
+ public InputChannelResult readRecord(DeserializationDelegate<Record> target) throws IOException, InterruptedException {
- if ((target = it.next(target)) != null) {
+ Record reuse = target != null ? target.getInstance() : null;
+
+ if ((reuse = it.next(reuse)) != null) {
// everything comes from the same source channel and buffer in this mock
notifyRecordIsAvailable(0);
return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
[3/3] incubator-flink git commit: [Scala API] Case Class serializer
can work with classes that cannot be instantiated empty
Posted by se...@apache.org.
[Scala API] Case Class serializer can work with classes that cannot be instantiated empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/112b3a93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/112b3a93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/112b3a93
Branch: refs/heads/master
Commit: 112b3a937a1644982f24174163085c582b471f8c
Parents: 48b6d01
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:23:00 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:59 2014 +0100
----------------------------------------------------------------------
.../scala/typeutils/CaseClassSerializer.scala | 35 ++++++++++++--------
1 file changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/112b3a93/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 61b876f..f82a67f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -33,15 +33,29 @@ abstract class CaseClassSerializer[T <: Product](
@transient var fields : Array[AnyRef] = _
+ @transient var instanceCreationFailed : Boolean = false
def createInstance: T = {
- initArray()
- var i = 0
- while (i < arity) {
- fields(i) = fieldSerializers(i).createInstance()
- i += 1
+ if (instanceCreationFailed) {
+ null.asInstanceOf[T]
+ }
+ else {
+ initArray()
+ try {
+ var i = 0
+ while (i < arity) {
+ fields(i) = fieldSerializers(i).createInstance()
+ i += 1
+ }
+ createInstance(fields)
+ }
+ catch {
+ case t: Throwable => {
+ instanceCreationFailed = true
+ null.asInstanceOf[T]
+ }
+ }
}
- createInstance(fields)
}
def copy(from: T, reuse: T): T = {
@@ -68,14 +82,7 @@ abstract class CaseClassSerializer[T <: Product](
}
def deserialize(reuse: T, source: DataInputView): T = {
- initArray()
- var i = 0
- while (i < arity) {
- val field = reuse.productElement(i).asInstanceOf[AnyRef]
- fields(i) = fieldSerializers(i).deserialize(field, source)
- i += 1
- }
- createInstance(fields)
+ deserialize(source);
}
def deserialize(source: DataInputView): T = {