You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hs...@apache.org on 2015/10/06 17:14:22 UTC
[2/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from
class and file names since it is no longer valid reference
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index c6a872c..988e903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -55,14 +55,14 @@ import java.util.List;
* @param <IN> The data type consumed by the combiner.
* @param <OUT> The data type produced by the combiner.
*/
-public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
+public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
- private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
+ private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
private InMemorySorter<IN> sorter;
@@ -87,7 +87,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
+ public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 59fb603..a03e42d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -40,11 +40,11 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.GroupReduceFunction
*/
-public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
+public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(GroupReduceDriver.class);
- private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
+ private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
private MutableObjectIterator<IT> input;
@@ -59,7 +59,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
+ public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 811f00c..7a9c8e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -46,11 +46,11 @@ import org.slf4j.LoggerFactory;
*
* @see org.apache.flink.api.common.functions.FlatJoinFunction
*/
-public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT2, OT>, OT> {
protected static final Logger LOG = LoggerFactory.getLogger(JoinDriver.class);
- protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+ protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private volatile JoinTaskIterator<IT1, IT2, OT> joinIterator; // the iterator that does the actual join
@@ -59,7 +59,7 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index fe926cb..51f9197 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -27,15 +27,15 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT1> hashTable;
@@ -55,7 +55,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -99,8 +99,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
final TypeComparator<IT1> solutionSetComparator;
// grab a handle to the hash table from the iteration broker
- if (taskContext instanceof AbstractIterativePactTask) {
- AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
+ if (taskContext instanceof AbstractIterativeTask) {
+ AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext;
String identifier = iterativeTaskContext.brokerKey();
Object table = SolutionSetBroker.instance().get(identifier);
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 20079fc..e1fad47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -27,15 +27,15 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT2> hashTable;
@@ -55,7 +55,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -99,8 +99,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
final TypeComparator<IT2> solutionSetComparator;
// grab a handle to the hash table from the iteration broker
- if (taskContext instanceof AbstractIterativePactTask) {
- AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
+ if (taskContext instanceof AbstractIterativeTask) {
+ AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext;
String identifier = iterativeTaskContext.brokerKey();
Object table = SolutionSetBroker.instance().get(identifier);
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index d861cbd..eefe8e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -36,9 +36,9 @@ import org.apache.flink.util.MutableObjectIterator;
* @param <IT> The mapper's input data type.
* @param <OT> The mapper's output data type.
*/
-public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
+public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
- private PactTaskContext<MapFunction<IT, OT>, OT> taskContext;
+ private TaskContext<MapFunction<IT, OT>, OT> taskContext;
private volatile boolean running;
@@ -46,7 +46,7 @@ public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
@Override
- public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) {
+ public void setup(TaskContext<MapFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index eaab904..8792ef1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -41,16 +41,16 @@ import org.slf4j.LoggerFactory;
* @param <IT> The mapper's input data type.
* @param <OT> The mapper's output data type.
*/
-public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFunction<IT, OT>, OT> {
+public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(MapPartitionDriver.class);
- private PactTaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
+ private TaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
private boolean objectReuseEnabled = false;
@Override
- public void setup(PactTaskContext<MapPartitionFunction<IT, OT>, OT> context) {
+ public void setup(TaskContext<MapPartitionFunction<IT, OT>, OT> context) {
this.taskContext = context;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 1fb4813..fcd2716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -32,18 +32,18 @@ import org.slf4j.LoggerFactory;
*
* @param <T> The data type.
*/
-public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
+public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
private static final Logger LOG = LoggerFactory.getLogger(MapPartitionDriver.class);
- private PactTaskContext<AbstractRichFunction, T> taskContext;
+ private TaskContext<AbstractRichFunction, T> taskContext;
private volatile boolean running;
private boolean objectReuseEnabled = false;
@Override
- public void setup(PactTaskContext<AbstractRichFunction, T> context) {
+ public void setup(TaskContext<AbstractRichFunction, T> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
deleted file mode 100644
index 288f7ca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
- * A driver implements the actual code to perform a batch operation, like <i>map()</i>,
- * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
- *
- * @see PactTaskContext
- *
- * @param <S> The type of stub driven by this driver.
- * @param <OT> The data type of the records produced by this driver.
- */
-public interface PactDriver<S extends Function, OT> {
-
- void setup(PactTaskContext<S, OT> context);
-
- /**
- * Gets the number of inputs that the task has.
- *
- * @return The number of inputs.
- */
- int getNumberOfInputs();
-
- /**
- * Gets the number of comparators required for this driver.
- *
- * @return The number of comparators required for this driver.
- */
- int getNumberOfDriverComparators();
-
- /**
- * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
- * <code>MapFunction.class</code>.
- *
- * @return The class of the stub type run by the task.
- */
- Class<S> getStubType();
-
- /**
- * This method is called before the user code is opened. An exception thrown by this method
- * signals failure of the task.
- *
- * @throws Exception Exceptions may be forwarded and signal task failure.
- */
- void prepare() throws Exception;
-
- /**
- * The main operation method of the task. It should call the user code with the data subsets until
- * the input is depleted.
- *
- * @throws Exception Any exception thrown by this method signals task failure. Because exceptions in the user
- * code typically signal situations where this instance in unable to proceed, exceptions
- * from the user code should be forwarded.
- */
- void run() throws Exception;
-
- /**
- * This method is invoked in any case (clean termination and exception) at the end of the tasks operation.
- *
- * @throws Exception Exceptions may be forwarded.
- */
- void cleanup() throws Exception;
-
- /**
- * This method is invoked when the driver must aborted in mid processing. It is invoked asynchronously by a different thread.
- *
- * @throws Exception Exceptions may be forwarded.
- */
- void cancel() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
deleted file mode 100644
index baeda3a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
- * the runtime components and configuration that they can use to fulfil their task.
- *
- * @param <S> The UDF type.
- * @param <OT> The produced data type.
- *
- * @see PactDriver
- */
-public interface PactTaskContext<S, OT> {
-
- TaskConfig getTaskConfig();
-
- TaskManagerRuntimeInfo getTaskManagerInfo();
-
- ClassLoader getUserCodeClassLoader();
-
- MemoryManager getMemoryManager();
-
- IOManager getIOManager();
-
- <X> MutableObjectIterator<X> getInput(int index);
-
- <X> TypeSerializerFactory<X> getInputSerializer(int index);
-
- <X> TypeComparator<X> getDriverComparator(int index);
-
- S getStub();
-
- ExecutionConfig getExecutionConfig();
-
- Collector<OT> getOutputCollector();
-
- AbstractInvokable getOwningNepheleTask();
-
- String formatLogString(String message);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index f990156..c77e746 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @param <T> The data type consumed and produced by the combiner.
*/
-public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class);
@@ -53,7 +53,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
- private PactTaskContext<ReduceFunction<T>, T> taskContext;
+ private TaskContext<ReduceFunction<T>, T> taskContext;
private TypeSerializer<T> serializer;
@@ -77,7 +77,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+ public void setup(TaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 8d15ef2..395beab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -39,11 +39,11 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.ReduceFunction
*/
-public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
private static final Logger LOG = LoggerFactory.getLogger(ReduceDriver.class);
- private PactTaskContext<ReduceFunction<T>, T> taskContext;
+ private TaskContext<ReduceFunction<T>, T> taskContext;
private MutableObjectIterator<T> input;
@@ -58,7 +58,7 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+ public void setup(TaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
deleted file mode 100644
index 89963af..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ /dev/null
@@ -1,1499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ChainedDriver;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
-import org.apache.flink.runtime.operators.shipping.OutputCollector;
-import org.apache.flink.runtime.operators.shipping.OutputEmitter;
-import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
-import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The base class for all tasks. Encapsulated common behavior and implements the main life-cycle
- * of the user code.
- */
-public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(RegularPactTask.class);
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * The driver that invokes the user code (the stub implementation). The central driver in this task
- * (further drivers may be chained behind this driver).
- */
- protected volatile PactDriver<S, OT> driver;
-
- /**
- * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
- */
- protected S stub;
-
- /**
- * The udf's runtime context.
- */
- protected DistributedRuntimeUDFContext runtimeUdfContext;
-
- /**
- * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
- * this task.
- */
- protected Collector<OT> output;
-
- /**
- * The output writers for the data that this task forwards to the next task. The latest driver (the central, if no chained
- * drivers exist, otherwise the last chained driver) produces its output to these writers.
- */
- protected List<RecordWriter<?>> eventualOutputs;
-
- /**
- * The input readers of this task.
- */
- protected MutableReader<?>[] inputReaders;
-
- /**
- * The input readers for the configured broadcast variables for this task.
- */
- protected MutableReader<?>[] broadcastInputReaders;
-
- /**
- * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
- */
- protected MutableObjectIterator<?>[] inputIterators;
-
- /**
- * The indices of the iterative inputs. Empty, if the task is not iterative.
- */
- protected int[] iterativeInputs;
-
- /**
- * The indices of the iterative broadcast inputs. Empty, if non of the inputs is iteratve.
- */
- protected int[] iterativeBroadcastInputs;
-
- /**
- * The local strategies that are applied on the inputs.
- */
- protected volatile CloseableInputProvider<?>[] localStrategies;
-
- /**
- * The optional temp barriers on the inputs for dead-lock breaking. Are
- * optionally resettable.
- */
- protected volatile TempBarrier<?>[] tempBarriers;
-
- /**
- * The resettable inputs in the case where no temp barrier is needed.
- */
- protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs;
-
- /**
- * The inputs to the operator. Return the readers' data after the application of the local strategy
- * and the temp-table barrier.
- */
- protected MutableObjectIterator<?>[] inputs;
-
- /**
- * The serializers for the input data type.
- */
- protected TypeSerializerFactory<?>[] inputSerializers;
-
- /**
- * The serializers for the broadcast input data types.
- */
- protected TypeSerializerFactory<?>[] broadcastInputSerializers;
-
- /**
- * The comparators for the central driver.
- */
- protected TypeComparator<?>[] inputComparators;
-
- /**
- * The task configuration with the setup parameters.
- */
- protected TaskConfig config;
-
- /**
- * A list of chained drivers, if there are any.
- */
- protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
-
- /**
- * Certain inputs may be excluded from resetting. For example, the initial partial solution
- * in an iteration head must not be reseted (it is read through the back channel), when all
- * others are reseted.
- */
- private boolean[] excludeFromReset;
-
- /**
- * Flag indicating for each input whether it is cached and can be reseted.
- */
- private boolean[] inputIsCached;
-
- /**
- * flag indicating for each input whether it must be asynchronously materialized.
- */
- private boolean[] inputIsAsyncMaterialized;
-
- /**
- * The amount of memory per input that is dedicated to the materialization.
- */
- private int[] materializationMemory;
-
- /**
- * The flag that tags the task as still running. Checked periodically to abort processing.
- */
- protected volatile boolean running = true;
-
- /**
- * The accumulator map used in the RuntimeContext.
- */
- protected Map<String, Accumulator<?,?>> accumulatorMap;
-
- // --------------------------------------------------------------------------------------------
- // Task Interface
- // --------------------------------------------------------------------------------------------
-
-
- /**
- * Initialization method. Runs in the execution graph setup phase in the JobManager
- * and as a setup method on the TaskManager.
- */
- @Override
- public void registerInputOutput() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Start registering input and output."));
- }
-
- // obtain task configuration (including stub parameters)
- Configuration taskConf = getTaskConfiguration();
- this.config = new TaskConfig(taskConf);
-
- // now get the operator class which drives the operation
- final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
- this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
-
- // initialize the readers.
- // this does not yet trigger any stream consuming or processing.
- initInputReaders();
- initBroadcastInputReaders();
-
- // initialize the writers.
- initOutputs();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Finished registering input and output."));
- }
- }
-
-
- /**
- * The main work method.
- */
- @Override
- public void invoke() throws Exception {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Start task code."));
- }
-
- Environment env = getEnvironment();
-
- this.runtimeUdfContext = createRuntimeContext(env.getTaskName());
-
- // whatever happens in this scope, make sure that the local strategies are cleaned up!
- // note that the initialization of the local strategies is in the try-finally block as well,
- // so that the thread that creates them catches its own errors that may happen in that process.
- // this is especially important, since there may be asynchronous closes (such as through canceling).
- try {
- // initialize the remaining data structures on the input and trigger the local processing
- // the local processing includes building the dams / caches
- try {
- int numInputs = driver.getNumberOfInputs();
- int numComparators = driver.getNumberOfDriverComparators();
- int numBroadcastInputs = this.config.getNumBroadcastInputs();
-
- initInputsSerializersAndComparators(numInputs, numComparators);
- initBroadcastInputsSerializers(numBroadcastInputs);
-
- // set the iterative status for inputs and broadcast inputs
- {
- List<Integer> iterativeInputs = new ArrayList<Integer>();
-
- for (int i = 0; i < numInputs; i++) {
- final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i);
-
- if (numberOfEventsUntilInterrupt < 0) {
- throw new IllegalArgumentException();
- }
- else if (numberOfEventsUntilInterrupt > 0) {
- this.inputReaders[i].setIterativeReader();
- iterativeInputs.add(i);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" +
- + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
- }
- }
- }
- this.iterativeInputs = asArray(iterativeInputs);
- }
-
- {
- List<Integer> iterativeBcInputs = new ArrayList<Integer>();
-
- for (int i = 0; i < numBroadcastInputs; i++) {
- final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i);
-
- if (numberOfEventsUntilInterrupt < 0) {
- throw new IllegalArgumentException();
- }
- else if (numberOfEventsUntilInterrupt > 0) {
- this.broadcastInputReaders[i].setIterativeReader();
- iterativeBcInputs.add(i);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Broadcast input [" + i + "] reads in supersteps with [" +
- + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
- }
- }
- }
- this.iterativeBroadcastInputs = asArray(iterativeBcInputs);
- }
-
- initLocalStrategies(numInputs);
- }
- catch (Exception e) {
- throw new RuntimeException("Initializing the input processing failed" +
- (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
- }
-
- if (!this.running) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Task cancelled before task code was started."));
- }
- return;
- }
-
- // pre main-function initialization
- initialize();
-
- // read the broadcast variables. they will be released in the finally clause
- for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
- final String name = this.config.getBroadcastInputName(i);
- readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1 /* superstep one for the start */);
- }
-
- // the work goes here
- run();
- }
- finally {
- // clean up in any case!
- closeLocalStrategiesAndCaches();
-
- clearReaders(inputReaders);
- clearWriters(eventualOutputs);
-
- }
-
- if (this.running) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Finished task code."));
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Task code cancelled."));
- }
- }
- }
-
- @Override
- public void cancel() throws Exception {
- this.running = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Cancelling task code"));
- }
-
- try {
- if (this.driver != null) {
- this.driver.cancel();
- }
- } finally {
- closeLocalStrategiesAndCaches();
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Main Work Methods
- // --------------------------------------------------------------------------------------------
-
- protected void initialize() throws Exception {
- // create the operator
- try {
- this.driver.setup(this);
- }
- catch (Throwable t) {
- throw new Exception("The driver setup for '" + this.getEnvironment().getTaskName() +
- "' , caused an error: " + t.getMessage(), t);
- }
-
- // instantiate the UDF
- try {
- final Class<? super S> userCodeFunctionType = this.driver.getStubType();
- // if the class is null, the driver has no user code
- if (userCodeFunctionType != null) {
- this.stub = initStub(userCodeFunctionType);
- }
- } catch (Exception e) {
- throw new RuntimeException("Initializing the UDF" +
- (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
- }
- }
-
- protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Setting broadcast variable '" + bcVarName + "'" +
- (superstep > 1 ? ", superstep " + superstep : "")));
- }
-
- @SuppressWarnings("unchecked")
- final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum];
-
- final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
-
- BroadcastVariableMaterialization<X, ?> variable = getEnvironment().getBroadcastVariableManager().materializeBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
- context.setBroadcastVariable(bcVarName, variable);
- }
-
- protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Releasing broadcast variable '" + bcVarName + "'" +
- (superstep > 1 ? ", superstep " + superstep : "")));
- }
-
- getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this);
- context.clearBroadcastVariable(bcVarName);
- }
-
-
- protected void run() throws Exception {
- // ---------------------------- Now, the actual processing starts ------------------------
- // check for asynchronous canceling
- if (!this.running) {
- return;
- }
-
- boolean stubOpen = false;
-
- try {
- // run the data preparation
- try {
- this.driver.prepare();
- }
- catch (Throwable t) {
- // if the preparation caused an error, clean up
- // errors during clean-up are swallowed, because we have already a root exception
- throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskName() +
- "' , caused an error: " + t.getMessage(), t);
- }
-
- // check for canceling
- if (!this.running) {
- return;
- }
-
- // start all chained tasks
- RegularPactTask.openChainedTasks(this.chainedTasks, this);
-
- // open stub implementation
- if (this.stub != null) {
- try {
- Configuration stubConfig = this.config.getStubParameters();
- FunctionUtils.openFunction(this.stub, stubConfig);
- stubOpen = true;
- }
- catch (Throwable t) {
- throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
- }
- }
-
- // run the user code
- this.driver.run();
-
- // close. We close here such that a regular close throwing an exception marks a task as failed.
- if (this.running && this.stub != null) {
- FunctionUtils.closeFunction(this.stub);
- stubOpen = false;
- }
-
- this.output.close();
-
- // close all chained tasks letting them report failure
- RegularPactTask.closeChainedTasks(this.chainedTasks, this);
- }
- catch (Exception ex) {
- // close the input, but do not report any exceptions, since we already have another root cause
- if (stubOpen) {
- try {
- FunctionUtils.closeFunction(this.stub);
- }
- catch (Throwable t) {
- // do nothing
- }
- }
-
- // if resettable driver invoke teardown
- if (this.driver instanceof ResettablePactDriver) {
- final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
- try {
- resDriver.teardown();
- } catch (Throwable t) {
- throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
- }
- }
-
- RegularPactTask.cancelChainedTasks(this.chainedTasks);
-
- ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
-
- if (ex instanceof CancelTaskException) {
- // forward canceling exception
- throw ex;
- }
- else if (this.running) {
- // throw only if task was not cancelled. in the case of canceling, exceptions are expected
- RegularPactTask.logAndThrowException(ex, this);
- }
- }
- finally {
- this.driver.cleanup();
- }
- }
-
- protected void closeLocalStrategiesAndCaches() {
-
- // make sure that all broadcast variable references held by this task are released
- if (LOG.isDebugEnabled()) {
- LOG.debug(formatLogString("Releasing all broadcast variables."));
- }
-
- getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
- if (runtimeUdfContext != null) {
- runtimeUdfContext.clearAllBroadcastVariables();
- }
-
- // clean all local strategies and caches/pipeline breakers.
-
- if (this.localStrategies != null) {
- for (int i = 0; i < this.localStrategies.length; i++) {
- if (this.localStrategies[i] != null) {
- try {
- this.localStrategies[i].close();
- } catch (Throwable t) {
- LOG.error("Error closing local strategy for input " + i, t);
- }
- }
- }
- }
- if (this.tempBarriers != null) {
- for (int i = 0; i < this.tempBarriers.length; i++) {
- if (this.tempBarriers[i] != null) {
- try {
- this.tempBarriers[i].close();
- } catch (Throwable t) {
- LOG.error("Error closing temp barrier for input " + i, t);
- }
- }
- }
- }
- if (this.resettableInputs != null) {
- for (int i = 0; i < this.resettableInputs.length; i++) {
- if (this.resettableInputs[i] != null) {
- try {
- this.resettableInputs[i].close();
- } catch (Throwable t) {
- LOG.error("Error closing cache for input " + i, t);
- }
- }
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Task Setup and Teardown
- // --------------------------------------------------------------------------------------------
-
- /**
- * @return the last output collector in the collector chain
- */
- @SuppressWarnings("unchecked")
- protected Collector<OT> getLastOutputCollector() {
- int numChained = this.chainedTasks.size();
- return (numChained == 0) ? output : (Collector<OT>) chainedTasks.get(numChained - 1).getOutputCollector();
- }
-
- /**
- * Sets the last output {@link Collector} of the collector chain of this {@link RegularPactTask}.
- * <p>
- * In case of chained tasks, the output collector of the last {@link ChainedDriver} is set. Otherwise it is the
- * single collector of the {@link RegularPactTask}.
- *
- * @param newOutputCollector new output collector to set as last collector
- */
- protected void setLastOutputCollector(Collector<OT> newOutputCollector) {
- int numChained = this.chainedTasks.size();
-
- if (numChained == 0) {
- output = newOutputCollector;
- return;
- }
-
- chainedTasks.get(numChained - 1).setOutputCollector(newOutputCollector);
- }
-
- public TaskConfig getLastTasksConfig() {
- int numChained = this.chainedTasks.size();
- return (numChained == 0) ? config : chainedTasks.get(numChained - 1).getTaskConfig();
- }
-
- protected S initStub(Class<? super S> stubSuperClass) throws Exception {
- try {
- ClassLoader userCodeClassLoader = getUserCodeClassLoader();
- S stub = config.<S>getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, userCodeClassLoader);
- // check if the class is a subclass, if the check is required
- if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
- throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
- stubSuperClass.getName() + "' as is required.");
- }
- FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
- return stub;
- }
- catch (ClassCastException ccex) {
- throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
- }
- }
-
- /**
- * Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
- *
- * This method requires that the task configuration, the driver, and the user-code class loader are set.
- */
- protected void initInputReaders() throws Exception {
- final int numInputs = getNumTaskInputs();
- final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
-
- int currentReaderOffset = 0;
-
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
- for (int i = 0; i < numInputs; i++) {
- // ---------------- create the input readers ---------------------
- // in case where a logical input unions multiple physical inputs, create a union reader
- final int groupSize = this.config.getGroupSize(i);
-
- if (groupSize == 1) {
- // non-union case
- inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
- } else if (groupSize > 1){
- // union case
- InputGate[] readers = new InputGate[groupSize];
- for (int j = 0; j < groupSize; ++j) {
- readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
- }
- inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
- } else {
- throw new Exception("Illegal input group size in task configuration: " + groupSize);
- }
-
- inputReaders[i].setReporter(reporter);
-
- currentReaderOffset += groupSize;
- }
- this.inputReaders = inputReaders;
-
- // final sanity check
- if (currentReaderOffset != this.config.getNumInputs()) {
- throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
- }
- }
-
- /**
- * Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
- *
- * This method requires that the task configuration, the driver, and the user-code class loader are set.
- */
- protected void initBroadcastInputReaders() throws Exception {
- final int numBroadcastInputs = this.config.getNumBroadcastInputs();
- final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
-
- int currentReaderOffset = config.getNumInputs();
-
- for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
- // ---------------- create the input readers ---------------------
- // in case where a logical input unions multiple physical inputs, create a union reader
- final int groupSize = this.config.getBroadcastGroupSize(i);
- if (groupSize == 1) {
- // non-union case
- broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
- } else if (groupSize > 1){
- // union case
- InputGate[] readers = new InputGate[groupSize];
- for (int j = 0; j < groupSize; ++j) {
- readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
- }
- broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
- } else {
- throw new Exception("Illegal input group size in task configuration: " + groupSize);
- }
-
- currentReaderOffset += groupSize;
- }
- this.broadcastInputReaders = broadcastInputReaders;
- }
-
- /**
- * Creates all the serializers and comparators.
- */
- protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
- this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
- this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
- this.inputIterators = new MutableObjectIterator<?>[numInputs];
-
- ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
- for (int i = 0; i < numInputs; i++) {
-
- final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, userCodeClassLoader);
- this.inputSerializers[i] = serializerFactory;
-
- this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
- }
-
- // ---------------- create the driver's comparators ---------------------
- for (int i = 0; i < numComparators; i++) {
-
- if (this.inputComparators != null) {
- final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader);
- this.inputComparators[i] = comparatorFactory.createComparator();
- }
- }
- }
-
- /**
- * Creates all the serializers and iterators for the broadcast inputs.
- */
- protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
- this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
-
- ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
- for (int i = 0; i < numBroadcastInputs; i++) {
- // ---------------- create the serializer first ---------------------
- final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
- this.broadcastInputSerializers[i] = serializerFactory;
- }
- }
-
- /**
- *
- * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and
- * {@code #initInputSerializersAndComparators(int)}!
- *
- * @param numInputs
- */
- protected void initLocalStrategies(int numInputs) throws Exception {
-
- final MemoryManager memMan = getMemoryManager();
- final IOManager ioMan = getIOManager();
-
- this.localStrategies = new CloseableInputProvider<?>[numInputs];
- this.inputs = new MutableObjectIterator<?>[numInputs];
- this.excludeFromReset = new boolean[numInputs];
- this.inputIsCached = new boolean[numInputs];
- this.inputIsAsyncMaterialized = new boolean[numInputs];
- this.materializationMemory = new int[numInputs];
-
- // set up the local strategies first, such that the can work before any temp barrier is created
- for (int i = 0; i < numInputs; i++) {
- initInputLocalStrategy(i);
- }
-
- // we do another loop over the inputs, because we want to instantiate all
- // sorters, etc before requesting the first input (as this call may block)
-
- // we have two types of materialized inputs, and both are replayable (can act as a cache)
- // The first variant materializes in a different thread and hence
- // acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
- // the second variant spills to the side and will not read unless the result is also consumed
- // in a pipelined fashion.
- this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
- this.tempBarriers = new TempBarrier<?>[numInputs];
-
- for (int i = 0; i < numInputs; i++) {
- final int memoryPages;
- final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
- final boolean cached = this.config.isInputCached(i);
-
- this.inputIsAsyncMaterialized[i] = async;
- this.inputIsCached[i] = cached;
-
- if (async || cached) {
- memoryPages = memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i));
- if (memoryPages <= 0) {
- throw new Exception("Input marked as materialized/cached, but no memory for materialization provided.");
- }
- this.materializationMemory[i] = memoryPages;
- } else {
- memoryPages = 0;
- }
-
- if (async) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages);
- barrier.startReading();
- this.tempBarriers[i] = barrier;
- this.inputs[i] = null;
- } else if (cached) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- SpillingResettableMutableObjectIterator<?> iter = new SpillingResettableMutableObjectIterator(
- getInput(i), this.inputSerializers[i].getSerializer(), getMemoryManager(), getIOManager(), memoryPages, this);
- this.resettableInputs[i] = iter;
- this.inputs[i] = iter;
- }
- }
- }
-
- protected void resetAllInputs() throws Exception {
-
- // first we need to make sure that caches consume remaining data
- // NOTE: we need to do this before closing the local strategies
- for (int i = 0; i < this.inputs.length; i++) {
-
- if (this.inputIsCached[i] && this.resettableInputs[i] != null) {
- this.resettableInputs[i].consumeAndCacheRemainingData();
- }
- }
-
- // close all local-strategies. they will either get re-initialized, or we have
- // read them now and their data is cached
- for (int i = 0; i < this.localStrategies.length; i++) {
- if (this.localStrategies[i] != null) {
- this.localStrategies[i].close();
- this.localStrategies[i] = null;
- }
- }
-
- final MemoryManager memMan = getMemoryManager();
- final IOManager ioMan = getIOManager();
-
- // reset the caches, or re-run the input local strategy
- for (int i = 0; i < this.inputs.length; i++) {
- if (this.excludeFromReset[i]) {
- if (this.tempBarriers[i] != null) {
- this.tempBarriers[i].close();
- this.tempBarriers[i] = null;
- } else if (this.resettableInputs[i] != null) {
- this.resettableInputs[i].close();
- this.resettableInputs[i] = null;
- }
- } else {
- // make sure the input is not available directly, but are lazily fetched again
- this.inputs[i] = null;
-
- if (this.inputIsCached[i]) {
- if (this.tempBarriers[i] != null) {
- this.inputs[i] = this.tempBarriers[i].getIterator();
- } else if (this.resettableInputs[i] != null) {
- this.resettableInputs[i].reset();
- this.inputs[i] = this.resettableInputs[i];
- } else {
- throw new RuntimeException("Found a resettable input, but no temp barrier and no resettable iterator.");
- }
- } else {
- // close the async barrier if there is one
- if (this.tempBarriers[i] != null) {
- this.tempBarriers[i].close();
- }
-
- // recreate the local strategy
- initInputLocalStrategy(i);
-
- if (this.inputIsAsyncMaterialized[i]) {
- final int pages = this.materializationMemory[i];
- @SuppressWarnings({ "unchecked", "rawtypes" })
- TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, pages);
- barrier.startReading();
- this.tempBarriers[i] = barrier;
- this.inputs[i] = null;
- }
- }
- }
- }
- }
-
- protected void excludeFromReset(int inputNum) {
- this.excludeFromReset[inputNum] = true;
- }
-
- private void initInputLocalStrategy(int inputNum) throws Exception {
- // check if there is already a strategy
- if (this.localStrategies[inputNum] != null) {
- throw new IllegalStateException();
- }
-
- // now set up the local strategy
- final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
- if (localStrategy != null) {
- switch (localStrategy) {
- case NONE:
- // the input is as it is
- this.inputs[inputNum] = this.inputIterators[inputNum];
- break;
- case SORT:
- @SuppressWarnings({ "rawtypes", "unchecked" })
- UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
- this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
- this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
- // set the input to null such that it will be lazily fetched from the input strategy
- this.inputs[inputNum] = null;
- this.localStrategies[inputNum] = sorter;
- break;
- case COMBININGSORT:
- // sanity check this special case!
- // this still breaks a bit of the abstraction!
- // we should have nested configurations for the local strategies to solve that
- if (inputNum != 0) {
- throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
- }
-
- // instantiate ourselves a combiner. we should not use the stub, because the sort and the
- // subsequent (group)reduce would otherwise share it multi-threaded
- final Class<S> userCodeFunctionType = this.driver.getStubType();
- if (userCodeFunctionType == null) {
- throw new IllegalStateException("Performing combining sort outside a reduce task!");
- }
- final S localStub;
- try {
- localStub = initStub(userCodeFunctionType);
- } catch (Exception e) {
- throw new RuntimeException("Initializing the user code and the configuration failed" +
- (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
- }
-
- if (!(localStub instanceof GroupCombineFunction)) {
- throw new IllegalStateException("Performing combining sort outside a reduce task!");
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
- (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
- this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
- this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
- cSorter.setUdfConfiguration(this.config.getStubParameters());
-
- // set the input to null such that it will be lazily fetched from the input strategy
- this.inputs[inputNum] = null;
- this.localStrategies[inputNum] = cSorter;
- break;
- default:
- throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
- }
- } else {
- // no local strategy in the config
- this.inputs[inputNum] = this.inputIterators[inputNum];
- }
- }
-
- private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
- TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, getUserCodeClassLoader());
- if (compFact == null) {
- throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
- }
- return compFact.createComparator();
- }
-
- protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
- @SuppressWarnings("unchecked")
- MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
- return iter;
- }
-
- protected int getNumTaskInputs() {
- return this.driver.getNumberOfInputs();
- }
-
- /**
- * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers.
- * The output collector applies the configured shipping strategies for each writer.
- */
- protected void initOutputs() throws Exception {
- this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
- this.eventualOutputs = new ArrayList<RecordWriter<?>>();
-
- ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
- AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
-
- this.accumulatorMap = accumulatorRegistry.getUserMap();
-
- this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs,
- this.getExecutionConfig(), reporter, this.accumulatorMap);
- }
-
- public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
- Environment env = getEnvironment();
-
- return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
- env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
- env.getDistributedCacheEntries(), this.accumulatorMap);
- }
-
- // --------------------------------------------------------------------------------------------
- // Task Context Signature
- // -------------------------------------------------------------------------------------------
-
- @Override
- public TaskConfig getTaskConfig() {
- return this.config;
- }
-
- @Override
- public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return getEnvironment().getTaskManagerInfo();
- }
-
- @Override
- public MemoryManager getMemoryManager() {
- return getEnvironment().getMemoryManager();
- }
-
- @Override
- public IOManager getIOManager() {
- return getEnvironment().getIOManager();
- }
-
- @Override
- public S getStub() {
- return this.stub;
- }
-
- @Override
- public Collector<OT> getOutputCollector() {
- return this.output;
- }
-
- @Override
- public AbstractInvokable getOwningNepheleTask() {
- return this;
- }
-
- @Override
- public String formatLogString(String message) {
- return constructLogString(message, getEnvironment().getTaskName(), this);
- }
-
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- if (index < 0 || index > this.driver.getNumberOfInputs()) {
- throw new IndexOutOfBoundsException();
- }
-
- // check for lazy assignment from input strategies
- if (this.inputs[index] != null) {
- @SuppressWarnings("unchecked")
- MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
- return in;
- } else {
- final MutableObjectIterator<X> in;
- try {
- if (this.tempBarriers[index] != null) {
- @SuppressWarnings("unchecked")
- MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.tempBarriers[index].getIterator();
- in = iter;
- } else if (this.localStrategies[index] != null) {
- @SuppressWarnings("unchecked")
- MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
- in = iter;
- } else {
- throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
- }
- this.inputs[index] = in;
- return in;
- } catch (InterruptedException iex) {
- throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
- } catch (IOException ioex) {
- throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + ".");
- }
- }
- }
-
-
- @Override
- public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
- if (index < 0 || index >= this.driver.getNumberOfInputs()) {
- throw new IndexOutOfBoundsException();
- }
-
- @SuppressWarnings("unchecked")
- final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
- return serializerFactory;
- }
-
-
- @Override
- public <X> TypeComparator<X> getDriverComparator(int index) {
- if (this.inputComparators == null) {
- throw new IllegalStateException("Comparators have not been created!");
- }
- else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
- throw new IndexOutOfBoundsException();
- }
-
- @SuppressWarnings("unchecked")
- final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
- return comparator;
- }
-
- // ============================================================================================
- // Static Utilities
- //
- // Utilities are consolidated here to ensure a uniform way of running,
- // logging, exception handling, and error messages.
- // ============================================================================================
-
- // --------------------------------------------------------------------------------------------
- // Logging
- // --------------------------------------------------------------------------------------------
- /**
- * Utility function that composes a string for logging purposes. The string includes the given message,
- * the given name of the task and the index in its subtask group as well as the number of instances
- * that exist in its subtask group.
- *
- * @param message The main message for the log.
- * @param taskName The name of the task.
- * @param parent The nephele task that contains the code producing the message.
- *
- * @return The string for logging.
- */
- public static String constructLogString(String message, String taskName, AbstractInvokable parent) {
- return message + ": " + taskName + " (" + (parent.getEnvironment().getIndexInSubtaskGroup() + 1) +
- '/' + parent.getEnvironment().getNumberOfSubtasks() + ')';
- }
-
- /**
- * Prints an error message and throws the given exception. If the exception is of the type
- * {@link ExceptionInChainedStubException} then the chain of contained exceptions is followed
- * until an exception of a different type is found.
- *
- * @param ex The exception to be thrown.
- * @param parent The parent task, whose information is included in the log message.
- * @throws Exception Always thrown.
- */
- public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception {
- String taskName;
- if (ex instanceof ExceptionInChainedStubException) {
- do {
- ExceptionInChainedStubException cex = (ExceptionInChainedStubException) ex;
- taskName = cex.getTaskName();
- ex = cex.getWrappedException();
- } while (ex instanceof ExceptionInChainedStubException);
- } else {
- taskName = parent.getEnvironment().getTaskName();
- }
-
- if (LOG.isErrorEnabled()) {
- LOG.error(constructLogString("Error in task code", taskName, parent), ex);
- }
-
- throw ex;
- }
-
- // --------------------------------------------------------------------------------------------
- // Result Shipping and Chained Tasks
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates the {@link Collector} for the given task, as described by the given configuration. The
- * output collector contains the writers that forward the data to the different tasks that the given task
- * is connected to. Each writer applies a the partitioning as described in the configuration.
- *
- * @param task The task that the output collector is created for.
- * @param config The configuration describing the output shipping strategies.
- * @param cl The classloader used to load user defined types.
- * @param eventualOutputs The output writers that this task forwards to the next task for each output.
- * @param outputOffset The offset to start to get the writers for the outputs
- * @param numOutputs The number of outputs described in the configuration.
- *
- * @return The OutputCollector that data produced in this task is submitted to.
- */
- public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl,
- List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception
- {
- if (numOutputs == 0) {
- return null;
- }
-
- // get the factory for the serializer
- final TypeSerializerFactory<T> serializerFactory = config.getOutputSerializer(cl);
-
- // special case the Record
- if (serializerFactory.getDataType().equals(Record.class)) {
- final List<RecordWriter<Record>> writers = new ArrayList<RecordWriter<Record>>(numOutputs);
-
- // create a writer for each output
- for (int i = 0; i < numOutputs; i++) {
- // create the OutputEmitter from output ship strategy
- final ShipStrategyType strategy = config.getOutputShipStrategy(i);
- final TypeComparatorFactory<?> compFact = config.getOutputComparator(i, cl);
- final RecordOutputEmitter oe;
- if (compFact == null) {
- oe = new RecordOutputEmitter(strategy);
- } else {
- @SuppressWarnings("unchecked")
- TypeComparator<Record> comparator = (TypeComparator<Record>) compFact.createComparator();
- if (!comparator.supportsCompareAgainstReference()) {
- throw new Exception("Incompatibe serializer-/comparator factories.");
- }
- final DataDistribution distribution = config.getOutputDataDistribution(i, cl);
- final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-
- oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution);
- }
-
- // setup accumulator counters
- final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
- recordWriter.setReporter(reporter);
-
- writers.add(recordWriter);
- }
- if (eventualOutputs != null) {
- eventualOutputs.addAll(writers);
- }
-
- @SuppressWarnings("unchecked")
- final Collector<T> outColl = (Collector<T>) new RecordOutputCollector(writers);
- return outColl;
- }
- else {
- // generic case
- final List<RecordWriter<SerializationDelegate<T>>> writers = new ArrayList<RecordWriter<SerializationDelegate<T>>>(numOutputs);
-
- // create a writer for each output
- for (int i = 0; i < numOutputs; i++)
- {
- // create the OutputEmitter from output ship strategy
- final ShipStrategyType strategy = config.getOutputShipStrategy(i);
- final TypeComparatorFactory<T> compFactory = config.getOutputComparator(i, cl);
-
- final ChannelSelector<SerializationDelegate<T>> oe;
- if (compFactory == null) {
- oe = new OutputEmitter<T>(strategy);
- }
- else {
- final DataDistribution dataDist = config.getOutputDataDistribution(i, cl);
- final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-
- final TypeComparator<T> comparator = compFactory.createComparator();
- oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
- }
-
- final RecordWriter<SerializationDelegate<T>> recordWriter =
- new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
-
- // setup live accumulator counters
- recordWriter.setReporter(reporter);
-
- writers.add(recordWriter);
- }
- if (eventualOutputs != null) {
- eventualOutputs.addAll(writers);
- }
- return new OutputCollector<T>(writers, serializerFactory.getSerializer());
- }
- }
-
- /**
- * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers.
- * The output collector applies the configured shipping strategy.
- */
- @SuppressWarnings("unchecked")
- public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config,
- List<ChainedDriver<?, ?>> chainedTasksTarget,
- List<RecordWriter<?>> eventualOutputs,
- ExecutionConfig executionConfig,
- AccumulatorRegistry.Reporter reporter,
- Map<String, Accumulator<?,?>> accumulatorMap)
- throws Exception
- {
- final int numOutputs = config.getNumOutputs();
-
- // check whether we got any chained tasks
- final int numChained = config.getNumberOfChainedStubs();
- if (numChained > 0) {
- // got chained stubs. that means that this one may only have a single forward connection
- if (numOutputs != 1 || config.getOutputShipStrategy(0) != ShipStrategyType.FORWARD) {
- throw new RuntimeException("Plan Generation Bug: Found a chained stub that is not connected via an only forward connection.");
- }
-
- // instantiate each task
- @SuppressWarnings("rawtypes")
- Collector previous = null;
- for (int i = numChained - 1; i >= 0; --i)
- {
- // get the task first
- final ChainedDriver<?, ?> ct;
- try {
- Class<? extends ChainedDriver<?, ?>> ctc = config.getChainedTask(i);
- ct = ctc.newInstance();
- }
- catch (Exception ex) {
- throw new RuntimeException("Could not instantiate chained task driver.", ex);
- }
-
- // get the configuration for the task
- final TaskConfig chainedStubConf = config.getChainedStubConfig(i);
- final String taskName = config.getChainedTaskName(i);
-
- if (i == numChained - 1) {
- // last in chain, instantiate the output collector for this task
- previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter);
- }
-
- ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap);
- chainedTasksTarget.add(0, ct);
-
- previous = ct;
- }
- // the collector of the first in the chain is the collector for the nephele task
- return (Collector<T>) previous;
- }
- // else
-
- // instantiate the output collector the default way from this configuration
- return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
- }
-
- // --------------------------------------------------------------------------------------------
- // User Code LifeCycle
- // --------------------------------------------------------------------------------------------
-
- /**
- * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces
- * an exception, a new exception with a standard error message is created, using the encountered exception
- * as its cause.
- *
- * @param stub The user code instance to be opened.
- * @param parameters The parameters supplied to the user code.
- *
- * @throws Exception Thrown, if the user code's open method produces an exception.
- */
- public static void openUserCode(Function stub, Configuration parameters) throws Exception {
- try {
- FunctionUtils.openFunction(stub, parameters);
- } catch (Throwable t) {
- throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t);
- }
- }
-
- /**
- * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces
- * an exception, a new exception with a standard error message is created, using the encountered exception
- * as its cause.
- *
- * @param stub The user code instance to be closed.
- *
- * @throws Exception Thrown, if the user code's close method produces an exception.
- */
- public static void closeUserCode(Function stub) throws Exception {
- try {
- FunctionUtils.closeFunction(stub);
- } catch (Throwable t) {
- throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Chained Task LifeCycle
- // --------------------------------------------------------------------------------------------
-
- /**
- * Opens all chained tasks, in the order as they are stored in the array. The opening process
- * creates a standardized log info message.
- *
- * @param tasks The tasks to be opened.
- * @param parent The parent task, used to obtain parameters to include in the log message.
- * @throws Exception Thrown, if the opening encounters an exception.
- */
- public static void openChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
- // start all chained tasks
- for (int i = 0; i < tasks.size(); i++) {
- final ChainedDriver<?, ?> task = tasks.get(i);
- if (LOG.isDebugEnabled()) {
- LOG.debug(constructLogString("Start task code", task.getTaskName(), parent));
- }
- task.openTask();
- }
- }
-
- /**
- * Closes all chained tasks, in the order as they are stored in the array. The closing process
- * creates a standardized log info message.
- *
- * @param tasks The tasks to be closed.
- * @param parent The parent task, used to obtain parameters to include in the log message.
- * @throws Exception Thrown, if the closing encounters an exception.
- */
- public static void closeChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
- for (int i = 0; i < tasks.size(); i++) {
- final ChainedDriver<?, ?> task = tasks.get(i);
- task.closeTask();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(constructLogString("Finished task code", task.getTaskName(), parent));
- }
- }
- }
-
- /**
- * Cancels all tasks via their {@link ChainedDriver#cancelTask()} method. Any occurring exception
- * and error is suppressed, such that the canceling method of every task is invoked in all cases.
- *
- * @param tasks The tasks to be canceled.
- */
- public static void cancelChainedTasks(List<ChainedDriver<?, ?>> tasks) {
- for (int i = 0; i < tasks.size(); i++) {
- try {
- tasks.get(i).cancelTask();
- } catch (Throwable t) {
- // do nothing
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous Utilities
- // --------------------------------------------------------------------------------------------
-
- /**
- * Instantiates a user code class from is definition in the task configuration.
- * The class is instantiated without arguments using the null-ary constructor. Instantiation
- * will fail if this constructor does not exist or is not public.
- *
- * @param <T> The generic type of the user code class.
- * @param config The task configuration containing the class description.
- * @param cl The class loader to be used to load the class.
- * @param superClass The super class that the user code class extends or implements, for type checking.
- *
- * @return An instance of the user code class.
- */
- public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) {
- try {
- T stub = config.<T>getStubWrapper(cl).getUserCodeObject(superClass, cl);
- // check if the class is a subclass, if the check is required
- if (superClass != null && !superClass.isAssignableFrom(stub.getClass())) {
- throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
- superClass.getName() + "' as is required.");
- }
- return stub;
- }
- catch (ClassCastException ccex) {
- throw new RuntimeException("The UDF class is not a proper subclass of " + superClass.getName(), ccex);
- }
- }
-
- private static int[] asArray(List<Integer> list) {
- int[] a = new int[list.size()];
-
- int i = 0;
- for (int val : list) {
- a[i++] = val;
- }
- return a;
- }
-
- public static void clearWriters(List<RecordWriter<?>> writers) {
- for (RecordWriter<?> writer : writers) {
- writer.clearBuffers();
- }
- }
-
- public static void clearReaders(MutableReader<?>[] readers) {
- for (MutableReader<?> reader : readers) {
- reader.clearBuffers();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
new file mode 100644
index 0000000..0ca7994
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.functions.Function;
+
+
+/**
+ * This interface marks a {@code Driver} as resettable, meaning that will reset part of their internal state but
+ * otherwise reuse existing data structures.
+ *
+ * @see Driver
+ * @see TaskContext
+ *
+ * @param <S> The type of stub driven by this driver.
+ * @param <OT> The data type of the records produced by this driver.
+ */
+public interface ResettableDriver<S extends Function, OT> extends Driver<S, OT> {
+
+ boolean isInputResettable(int inputNum);
+
+ void initialize() throws Exception;
+
+ void reset() throws Exception;
+
+ void teardown() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
deleted file mode 100644
index 85cde1b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.Function;
-
-
-/**
- * This interface marks a {@code PactDriver} as resettable, meaning that will reset part of their internal state but
- * otherwise reuse existing data structures.
- *
- * @see PactDriver
- * @see PactTaskContext
- *
- * @param <S> The type of stub driven by this driver.
- * @param <OT> The data type of the records produced by this driver.
- */
-public interface ResettablePactDriver<S extends Function, OT> extends PactDriver<S, OT> {
-
- boolean isInputResettable(int inputNum);
-
- void initialize() throws Exception;
-
- void reset() throws Exception;
-
- void teardown() throws Exception;
-}