You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/11 14:15:28 UTC
[2/3] flink git commit: [FLINK-2138] [streaming] Added custom
partitioning to DataStream
[FLINK-2138] [streaming] Added custom partitioning to DataStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97d10070
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97d10070
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97d10070
Branch: refs/heads/master
Commit: 97d10070c7ff5986b8e7ee08dcb6a7e74473cd25
Parents: 490076a
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Jun 26 17:23:36 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Jul 11 14:00:56 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 4 +-
.../streaming/api/datastream/DataStream.java | 70 ++++++++++++++++++--
.../partitioner/CustomPartitionerWrapper.java | 57 ++++++++++++++++
.../runtime/partitioner/StreamPartitioner.java | 2 +-
.../streaming/util/keys/KeySelectorUtil.java | 40 +++++++++++
.../flink/streaming/api/DataStreamTest.java | 28 +++++++-
6 files changed, 193 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e217e53..d24a350 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1128,14 +1128,14 @@ public abstract class DataSet<T> {
/**
* Partitions a DataSet on the key returned by the selector, using a custom partitioner.
- * This method takes the key selector t get the key to partition on, and a partitioner that
+ * This method takes the key selector to get the key to partition on, and a partitioner that
* accepts the key type.
* <p>
* Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
- * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned.
+ * @param keyExtractor The KeyExtractor with which the DataSet is partitioned.
* @return The partitioned DataSet.
*
* @see KeySelector
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bf0ff23..8fb896e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -64,9 +66,10 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -81,7 +84,6 @@ import com.google.common.base.Preconditions;
* <ul>
* <li>{@link DataStream#map},</li>
* <li>{@link DataStream#filter}, or</li>
- * <li>{@link DataStream#sum}.</li>
* </ul>
*
* @param <OUT>
@@ -451,6 +453,66 @@ public class DataStream<OUT> {
}
/**
+ * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
+ * This method takes the key position to partition on, and a partitioner that accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys.
+ *
+ * @param partitioner The partitioner to assign partitions to keys.
+ * @param field The field index on which the DataStream is to partitioned.
+ * @return The partitioned DataStream.
+ */
+ public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, int field) {
+ Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new int[]{field}, getType());
+ return partitionCustom(partitioner, outExpressionKeys);
+ }
+
+ /**
+ * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
+ * This method takes the key expression to partition on, and a partitioner that accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys.
+ *
+ * @param partitioner The partitioner to assign partitions to keys.
+ * @param field The field index on which the DataStream is to partitioned.
+ * @return The partitioned DataStream.
+ */
+ public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, String field) {
+ Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new String[]{field}, getType());
+ return partitionCustom(partitioner, outExpressionKeys);
+ }
+
+
+ /**
+ * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
+ * This method takes the key selector to get the key to partition on, and a partitioner that
+ * accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys, i.e. the selector cannot return tuples
+ * of fields.
+ *
+ * @param partitioner
+ * The partitioner to assign partitions to keys.
+ * @param keySelector
+ * The KeySelector with which the DataStream is partitioned.
+ * @return The partitioned DataStream.
+ * @see KeySelector
+ */
+ public <K extends Comparable<K>> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) {
+ return setConnectionType(new CustomPartitionerWrapper<K, OUT>(clean(partitioner), clean(keySelector)));
+ }
+
+ // private helper method for custom partitioning
+ private <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, Keys<OUT> keys) {
+ KeySelector<OUT, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig());
+
+ return setConnectionType(
+ new CustomPartitionerWrapper<K, OUT>(
+ clean(partitioner),
+ clean(keySelector)));
+ }
+
+ /**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component.
*
@@ -530,7 +592,7 @@ public class DataStream<OUT> {
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedDataStream} be calling
- * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+ * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
@@ -561,7 +623,7 @@ public class DataStream<OUT> {
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedDataStream} be calling
- * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+ * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
new file mode 100644
index 0000000..75867cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that selects the channel with a user defined partitioner function on a key.
+ *
+ * @param <K>
+ * Type of the key
+ * @param <T>
+ * Type of the data
+ */
+public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
+ private static final long serialVersionUID = 1L;
+
+ private int[] returnArray = new int[1];
+ Partitioner<K> partitioner;
+ KeySelector<T, K> keySelector;
+
+ public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
+ super(PartitioningStrategy.CUSTOM);
+ this.partitioner = partitioner;
+ this.keySelector = keySelector;
+ }
+
+ @Override
+ public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+ int numberOfOutputChannels) {
+
+ K key = record.getInstance().getKey(keySelector);
+
+ returnArray[0] = partitioner.partition(key,
+ numberOfOutputChannels);
+
+ return returnArray;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index 3af7c7a..ef598c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
public enum PartitioningStrategy {
- FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY
+ FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM
}
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 77467b5..49f2fe0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.keys;
import java.lang.reflect.Array;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -71,6 +72,45 @@ public class KeySelectorUtil {
return new ComparableKeySelector<X>(comparator, keyLength);
}
+ public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo,
+ ExecutionConfig executionConfig) {
+ if (partitioner != null) {
+ keys.validateCustomPartitioner(partitioner, null);
+ }
+
+ int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+
+ if (logicalKeyPositions.length != 1) {
+ throw new IllegalArgumentException("There must be exactly 1 key specified");
+ }
+
+ TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
+ logicalKeyPositions, new boolean[1], 0, executionConfig);
+ return new OneKeySelector<X, K>(comparator);
+ }
+
+ public static class OneKeySelector<IN, K> implements KeySelector<IN, K> {
+
+ private static final long serialVersionUID = 1L;
+
+ private TypeComparator<IN> comparator;
+ private Object[] keyArray;
+ private K key;
+
+ public OneKeySelector(TypeComparator<IN> comparator) {
+ this.comparator = comparator;
+ keyArray = new Object[1];
+ }
+
+ @Override
+ public K getKey(IN value) throws Exception {
+ comparator.extractKeys(value, keyArray, 0);
+ key = (K) keyArray[0];
+ return key;
+ }
+
+ }
+
public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index f3b98b2..764c6f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -64,7 +66,7 @@ import org.junit.Test;
public class DataStreamTest {
private static final long MEMORYSIZE = 32;
- private static int PARALLELISM = 1;
+ private static int PARALLELISM = 2;
/**
* Tests {@link SingleOutputStreamOperator#name(String)} functionality.
@@ -167,6 +169,26 @@ public class DataStreamTest {
assertFalse(isGrouped(partition2));
assertFalse(isGrouped(partition4));
+ // Testing DataStream custom partitioning
+ Partitioner<Long> longPartitioner = new Partitioner<Long>() {
+ @Override
+ public int partition(Long key, int numPartitions) {
+ return 100;
+ }
+ };
+
+ DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0);
+ DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
+ DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
+
+ assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1))));
+ assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3))));
+ assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4))));
+
+ assertFalse(isGrouped(customPartition1));
+ assertFalse(isGrouped(customPartition3));
+ assertFalse(isGrouped(customPartition4));
+
//Testing ConnectedDataStream grouping
ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
Integer downStreamId1 = createDownStreamId(connectedGroup1);
@@ -524,6 +546,10 @@ public class DataStreamTest {
return edge.getPartitioner() instanceof FieldsPartitioner;
}
+ private static boolean isCustomPartitioned(StreamEdge edge) {
+ return edge.getPartitioner() instanceof CustomPartitionerWrapper;
+ }
+
private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {