You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 14:01:34 UTC

[6/7] flink git commit: [FLINK-785] ChainedAllReduce

[FLINK-785] ChainedAllReduce


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a7350d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a7350d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a7350d

Branch: refs/heads/master
Commit: a3a7350d55a3e9b20a6c53aedd8d1c24fb188122
Parents: b4152d7
Author: zentol <s....@web.de>
Authored: Wed Feb 11 16:02:15 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/DriverStrategy.java |   3 +-
 .../chaining/ChainedAllReduceDriver.java        | 112 +++++++++++++++++++
 2 files changed, 114 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 7942b3b..4a0035c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM;
 import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
 import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
+import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
 
 import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -51,7 +52,7 @@ public enum DriverStrategy {
 	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
 
 	// group everything together into one group and apply the Reduce function
-	ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
+	ALL_REDUCE(AllReduceDriver.class, ChainedAllReduceDriver.class, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce function
 	ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce's combine function

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
new file mode 100644
index 0000000..4641fce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
+	private static final Logger LOG = LoggerFactory.getLogger(ChainedAllReduceDriver.class);
+
+	// --------------------------------------------------------------------------------------------
+	private ReduceFunction<IT> reducer;
+	private TypeSerializer<IT> serializer;
+
+	private IT base;
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public void setup(AbstractInvokable parent) {
+		@SuppressWarnings("unchecked")
+		final ReduceFunction<IT> red = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class);
+		this.reducer = red;
+		FunctionUtils.setFunctionRuntimeContext(red, getUdfRuntimeContext());
+
+		TypeSerializerFactory<IT> serializerFactory = this.config.getInputSerializer(0, userCodeClassLoader);
+		this.serializer = serializerFactory.getSerializer();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ChainedAllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+		}
+	}
+
+	@Override
+	public void openTask() throws Exception {
+		Configuration stubConfig = this.config.getStubParameters();
+		RegularPactTask.openUserCode(this.reducer, stubConfig);
+	}
+
+	@Override
+	public void closeTask() throws Exception {
+		RegularPactTask.closeUserCode(this.reducer);
+	}
+
+	@Override
+	public void cancelTask() {
+		try {
+			FunctionUtils.closeFunction(this.reducer);
+		} catch (Throwable t) {
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public Function getStub() {
+		return this.reducer;
+	}
+
+	@Override
+	public String getTaskName() {
+		return this.taskName;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public void collect(IT record) {
+		try {
+			if (base == null) {
+				base = objectReuseEnabled ? record : serializer.copy(record);
+			} else {
+				base = objectReuseEnabled ? reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
+			}
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(taskName, e);
+		}
+	}
+
+	@Override
+	public void close() {
+		try {
+			if (base != null) {
+				this.outputCollector.collect(base);
+				base = null;
+			}
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+		this.outputCollector.close();
+	}
+}