You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 14:01:34 UTC
[6/7] flink git commit: [FLINK-785] ChainedAllReduce
[FLINK-785] ChainedAllReduce
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a7350d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a7350d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a7350d
Branch: refs/heads/master
Commit: a3a7350d55a3e9b20a6c53aedd8d1c24fb188122
Parents: b4152d7
Author: zentol <s....@web.de>
Authored: Wed Feb 11 16:02:15 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/operators/DriverStrategy.java | 3 +-
.../chaining/ChainedAllReduceDriver.java | 112 +++++++++++++++++++
2 files changed, 114 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 7942b3b..4a0035c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM;
import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
+import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -51,7 +52,7 @@ public enum DriverStrategy {
FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
// group everything together into one group and apply the Reduce function
- ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
+ ALL_REDUCE(AllReduceDriver.class, ChainedAllReduceDriver.class, PIPELINED, 0),
// group everything together into one group and apply the GroupReduce function
ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
// group everything together into one group and apply the GroupReduce's combine function
http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
new file mode 100644
index 0000000..4641fce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
+ private static final Logger LOG = LoggerFactory.getLogger(ChainedAllReduceDriver.class);
+
+ // --------------------------------------------------------------------------------------------
+ private ReduceFunction<IT> reducer;
+ private TypeSerializer<IT> serializer;
+
+ private IT base;
+
+ // --------------------------------------------------------------------------------------------
+ @Override
+ public void setup(AbstractInvokable parent) {
+ @SuppressWarnings("unchecked")
+ final ReduceFunction<IT> red = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class);
+ this.reducer = red;
+ FunctionUtils.setFunctionRuntimeContext(red, getUdfRuntimeContext());
+
+ TypeSerializerFactory<IT> serializerFactory = this.config.getInputSerializer(0, userCodeClassLoader);
+ this.serializer = serializerFactory.getSerializer();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ChainedAllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+ }
+ }
+
+ @Override
+ public void openTask() throws Exception {
+ Configuration stubConfig = this.config.getStubParameters();
+ RegularPactTask.openUserCode(this.reducer, stubConfig);
+ }
+
+ @Override
+ public void closeTask() throws Exception {
+ RegularPactTask.closeUserCode(this.reducer);
+ }
+
+ @Override
+ public void cancelTask() {
+ try {
+ FunctionUtils.closeFunction(this.reducer);
+ } catch (Throwable t) {
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ @Override
+ public Function getStub() {
+ return this.reducer;
+ }
+
+ @Override
+ public String getTaskName() {
+ return this.taskName;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ @Override
+ public void collect(IT record) {
+ try {
+ if (base == null) {
+ base = objectReuseEnabled ? record : serializer.copy(record);
+ } else {
+ base = objectReuseEnabled ? reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
+ }
+ } catch (Exception e) {
+ throw new ExceptionInChainedStubException(taskName, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (base != null) {
+ this.outputCollector.collect(base);
+ base = null;
+ }
+ } catch (Exception e) {
+ throw new ExceptionInChainedStubException(this.taskName, e);
+ }
+ this.outputCollector.close();
+ }
+}