You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/11 14:15:28 UTC

[2/3] flink git commit: [FLINK-2138] [streaming] Added custom partitioning to DataStream

[FLINK-2138] [streaming] Added custom partitioning to DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97d10070
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97d10070
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97d10070

Branch: refs/heads/master
Commit: 97d10070c7ff5986b8e7ee08dcb6a7e74473cd25
Parents: 490076a
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Jun 26 17:23:36 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Jul 11 14:00:56 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  4 +-
 .../streaming/api/datastream/DataStream.java    | 70 ++++++++++++++++++--
 .../partitioner/CustomPartitionerWrapper.java   | 57 ++++++++++++++++
 .../runtime/partitioner/StreamPartitioner.java  |  2 +-
 .../streaming/util/keys/KeySelectorUtil.java    | 40 +++++++++++
 .../flink/streaming/api/DataStreamTest.java     | 28 +++++++-
 6 files changed, 193 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e217e53..d24a350 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1128,14 +1128,14 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Partitions a DataSet on the key returned by the selector, using a custom partitioner.
-	 * This method takes the key selector t get the key to partition on, and a partitioner that
+	 * This method takes the key selector to get the key to partition on, and a partitioner that
 	 * accepts the key type.
 	 * <p>
 	 * Note: This method works only on single field keys, i.e. the selector cannot return tuples
 	 * of fields.
 	 * 
 	 * @param partitioner The partitioner to assign partitions to keys.
-	 * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned.
+	 * @param keyExtractor The KeyExtractor with which the DataSet is partitioned.
 	 * @return The partitioned DataSet.
 	 * 
 	 * @see KeySelector

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bf0ff23..8fb896e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -64,9 +66,10 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -81,7 +84,6 @@ import com.google.common.base.Preconditions;
  * <ul>
  * <li>{@link DataStream#map},</li>
  * <li>{@link DataStream#filter}, or</li>
- * <li>{@link DataStream#sum}.</li>
  * </ul>
  * 
  * @param <OUT>
@@ -451,6 +453,66 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
+	 * This method takes the key position to partition on, and a partitioner that accepts the key type.
+	 * <p>
+	 * Note: This method works only on single field keys.
+	 *
+	 * @param partitioner The partitioner to assign partitions to keys.
+	 * @param field The field index on which the DataStream is to partitioned.
+	 * @return The partitioned DataStream.
+	 */
+	public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, int field) {
+		Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new int[]{field}, getType());
+		return partitionCustom(partitioner, outExpressionKeys);
+	}
+
+	/**
+	 * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
+	 * This method takes the key expression to partition on, and a partitioner that accepts the key type.
+	 * <p>
+	 * Note: This method works only on single field keys.
+	 *
+	 * @param partitioner The partitioner to assign partitions to keys.
+	 * @param field The field index on which the DataStream is to partitioned.
+	 * @return The partitioned DataStream.
+	 */
+	public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, String field) {
+		Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new String[]{field}, getType());
+		return partitionCustom(partitioner, outExpressionKeys);
+	}
+
+
+	/**
+	 * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
+	 * This method takes the key selector to get the key to partition on, and a partitioner that
+	 * accepts the key type.
+	 * <p>
+	 * Note: This method works only on single field keys, i.e. the selector cannot return tuples
+	 * of fields.
+	 *
+	 * @param partitioner
+	 * 		The partitioner to assign partitions to keys.
+	 * @param keySelector
+	 * 		The KeySelector with which the DataStream is partitioned.
+	 * @return The partitioned DataStream.
+	 * @see KeySelector
+	 */
+	public <K extends Comparable<K>> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) {
+		return setConnectionType(new CustomPartitionerWrapper<K, OUT>(clean(partitioner), clean(keySelector)));
+	}
+
+	//	private helper method for custom partitioning
+	private <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, Keys<OUT> keys) {
+		KeySelector<OUT, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig());
+
+		return setConnectionType(
+				new CustomPartitionerWrapper<K, OUT>(
+						clean(partitioner),
+						clean(keySelector)));
+	}
+
+	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are broadcasted to every parallel instance of the next component.
 	 *
@@ -530,7 +592,7 @@ public class DataStream<OUT> {
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
 	 * {@link ConnectedDataStream} be calling
-	 * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
@@ -561,7 +623,7 @@ public class DataStream<OUT> {
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
 	 * {@link ConnectedDataStream} be calling
-	 * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
new file mode 100644
index 0000000..75867cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that selects the channel with a user defined partitioner function on a key.
+ *
+ * @param <K>
+ *            Type of the key
+ * @param <T>
+ *            Type of the data
+ */
+public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[1];
+	Partitioner<K> partitioner;
+	KeySelector<T, K> keySelector;
+
+	public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
+		super(PartitioningStrategy.CUSTOM);
+		this.partitioner = partitioner;
+		this.keySelector = keySelector;
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+
+		K key = record.getInstance().getKey(keySelector);
+
+		returnArray[0] = partitioner.partition(key,
+				numberOfOutputChannels);
+
+		return returnArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index 3af7c7a..ef598c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
 
 	public enum PartitioningStrategy {
 
-		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY
+		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 77467b5..49f2fe0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.keys;
 import java.lang.reflect.Array;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -71,6 +72,45 @@ public class KeySelectorUtil {
 		return new ComparableKeySelector<X>(comparator, keyLength);
 	}
 
+	public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo,
+			ExecutionConfig executionConfig) {
+		if (partitioner != null) {
+			keys.validateCustomPartitioner(partitioner, null);
+		}
+
+		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+
+		if (logicalKeyPositions.length != 1) {
+			throw new IllegalArgumentException("There must be exactly 1 key specified");
+		}
+
+		TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
+				logicalKeyPositions, new boolean[1], 0, executionConfig);
+		return new OneKeySelector<X, K>(comparator);
+	}
+
+	public static class OneKeySelector<IN, K> implements KeySelector<IN, K> {
+
+		private static final long serialVersionUID = 1L;
+
+		private TypeComparator<IN> comparator;
+		private Object[] keyArray;
+		private K key;
+
+		public OneKeySelector(TypeComparator<IN> comparator) {
+			this.comparator = comparator;
+			keyArray = new Object[1];
+		}
+
+		@Override
+		public K getKey(IN value) throws Exception {
+			comparator.extractKeys(value, keyArray, 0);
+			key = (K) keyArray[0];
+			return key;
+		}
+
+	}
+
 	public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index f3b98b2..764c6f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -64,7 +66,7 @@ import org.junit.Test;
 public class DataStreamTest {
 
 	private static final long MEMORYSIZE = 32;
-	private static int PARALLELISM = 1;
+	private static int PARALLELISM = 2;
 
 	/**
 	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
@@ -167,6 +169,26 @@ public class DataStreamTest {
 		assertFalse(isGrouped(partition2));
 		assertFalse(isGrouped(partition4));
 
+		// Testing DataStream custom partitioning
+		Partitioner<Long> longPartitioner = new Partitioner<Long>() {
+			@Override
+			public int partition(Long key, int numPartitions) {
+				return 100;
+			}
+		};
+
+		DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0);
+		DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
+		DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
+
+		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1))));
+		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3))));
+		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4))));
+
+		assertFalse(isGrouped(customPartition1));
+		assertFalse(isGrouped(customPartition3));
+		assertFalse(isGrouped(customPartition4));
+
 		//Testing ConnectedDataStream grouping
 		ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
 		Integer downStreamId1 = createDownStreamId(connectedGroup1);
@@ -524,6 +546,10 @@ public class DataStreamTest {
 		return edge.getPartitioner() instanceof FieldsPartitioner;
 	}
 
+	private static boolean isCustomPartitioned(StreamEdge edge) {
+		return edge.getPartitioner() instanceof CustomPartitionerWrapper;
+	}
+
 	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
 		@Override
 		public Long getKey(Tuple2<Long, Long> value) throws Exception {