You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/23 20:15:33 UTC

[1/3] flink git commit: [FLINK-2662] [optimizer] Fix computation of global properties of union operator.

Repository: flink
Updated Branches:
  refs/heads/release-1.1 871de0bf7 -> 424fb24c3


[FLINK-2662] [optimizer] Fix computation of global properties of union operator.

- Fixes invalid shipping strategy between consecutive unions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efbd293a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efbd293a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efbd293a

Branch: refs/heads/release-1.1
Commit: efbd293afe4348b0f199e2c66a990ae6880edcef
Parents: 871de0b
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 21 19:06:42 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 21:08:08 2016 +0100

----------------------------------------------------------------------
 .../operators/BinaryUnionOpDescriptor.java      |  30 ++-
 .../flink/optimizer/UnionReplacementTest.java   | 240 ++++++++++++++++++-
 2 files changed, 258 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efbd293a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
index 8cc517e..78ac3d6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
@@ -69,11 +69,35 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
 		
 		if (in1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
 			in2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
-			in1.getPartitioningFields().equals(in2.getPartitioningFields()))
-		{
+			in1.getPartitioningFields().equals(in2.getPartitioningFields())) {
 			newProps.setHashPartitioned(in1.getPartitioningFields());
 		}
-		
+		else if (in1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					in2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					in1.getPartitioningOrdering().equals(in2.getPartitioningOrdering()) &&
+					(
+						in1.getDataDistribution() == null && in2.getDataDistribution() == null ||
+						in1.getDataDistribution() != null && in1.getDataDistribution().equals(in2.getDataDistribution())
+					)
+				) {
+			if (in1.getDataDistribution() == null) {
+				newProps.setRangePartitioned(in1.getPartitioningOrdering());
+			}
+			else {
+				newProps.setRangePartitioned(in1.getPartitioningOrdering(), in1.getDataDistribution());
+			}
+		}
+		else if (in1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					in2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					in1.getPartitioningFields().equals(in2.getPartitioningFields()) &&
+					in1.getCustomPartitioner().equals(in2.getCustomPartitioner())) {
+			newProps.setCustomPartitioned(in1.getPartitioningFields(), in1.getCustomPartitioner());
+		}
+		else if (in1.getPartitioning() == PartitioningProperty.FORCED_REBALANCED &&
+					in2.getPartitioning() == PartitioningProperty.FORCED_REBALANCED) {
+			newProps.setForcedRebalanced();
+		}
+
 		return newProps;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/efbd293a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index 3be7657..d0bb376 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -19,18 +19,25 @@
 package org.apache.flink.optimizer;
 
 import junit.framework.Assert;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
 import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
@@ -87,7 +94,7 @@ public class UnionReplacementTest extends CompilerTestBase {
 	 *
 	 */
 	@Test
-	public void testUnionWithTwoOutputsTest() throws Exception {
+	public void testUnionWithTwoOutputs() throws Exception {
 
 		// -----------------------------------------------------------------------------------------
 		// Build test program
@@ -120,38 +127,253 @@ public class UnionReplacementTest extends CompilerTestBase {
 		SingleInputPlanNode groupRed2 = resolver.getNode("2");
 
 		// check partitioning is correct
-		Assert.assertTrue("Reduce input should be partitioned on 0.",
+		assertTrue("Reduce input should be partitioned on 0.",
 			groupRed1.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(0)));
-		Assert.assertTrue("Reduce input should be partitioned on 1.",
+		assertTrue("Reduce input should be partitioned on 1.",
 			groupRed2.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(1)));
 
 		// check group reduce inputs are n-ary unions with three inputs
-		Assert.assertTrue("Reduce input should be n-ary union with three inputs.",
+		assertTrue("Reduce input should be n-ary union with three inputs.",
 			groupRed1.getInput().getSource() instanceof NAryUnionPlanNode &&
 				((NAryUnionPlanNode) groupRed1.getInput().getSource()).getListOfInputs().size() == 3);
-		Assert.assertTrue("Reduce input should be n-ary union with three inputs.",
+		assertTrue("Reduce input should be n-ary union with three inputs.",
 			groupRed2.getInput().getSource() instanceof NAryUnionPlanNode &&
 				((NAryUnionPlanNode) groupRed2.getInput().getSource()).getListOfInputs().size() == 3);
 
 		// check channel from union to group reduce is forwarding
-		Assert.assertTrue("Channel between union and group reduce should be forwarding",
+		assertTrue("Channel between union and group reduce should be forwarding",
 			groupRed1.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
-		Assert.assertTrue("Channel between union and group reduce should be forwarding",
+		assertTrue("Channel between union and group reduce should be forwarding",
 			groupRed2.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
 
 		// check that all inputs of unions are hash partitioned
 		List<Channel> union123In = ((NAryUnionPlanNode) groupRed1.getInput().getSource()).getListOfInputs();
 		for(Channel i : union123In) {
-			Assert.assertTrue("Union input channel should hash partition on 0",
+			assertTrue("Union input channel should hash partition on 0",
 				i.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) &&
 					i.getShipStrategyKeys().isExactMatch(new FieldList(0)));
 		}
 		List<Channel> union234In = ((NAryUnionPlanNode) groupRed2.getInput().getSource()).getListOfInputs();
 		for(Channel i : union234In) {
-			Assert.assertTrue("Union input channel should hash partition on 0",
+			assertTrue("Union input channel should hash partition on 0",
 				i.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) &&
 					i.getShipStrategyKeys().isExactMatch(new FieldList(1)));
 		}
 
 	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by PartitionByHash is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> PartitionByHash -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the hash partitioning (ShippingStrategy.PARTITION_HASH) must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithHashPartitioning() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.partitionByHash(1).output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be hash partitioned.",
+			PartitioningProperty.HASH_PARTITIONED, sink.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Sink input should be hash partitioned on 1.",
+			new FieldList(1), sink.getInput().getGlobalProperties().getPartitioningFields());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be hash partitioned.",
+			PartitioningProperty.HASH_PARTITIONED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input should be hash partitioned on 1.",
+			new FieldList(1), partitioner.getInput().getGlobalProperties().getPartitioningFields());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be hash partitioned
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be hash partitioned",
+				PartitioningProperty.HASH_PARTITIONED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be hash partitioning",
+				ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			assertTrue("Union input should be data source",
+				c.getSource() instanceof SourcePlanNode);
+		}
+	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by REBALANCE is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> Rebalance -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the Rebalance (ShippingStrategy.PARTITION_FORCED_REBALANCE) must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithRebalance() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.rebalance().output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be force rebalanced.",
+			PartitioningProperty.FORCED_REBALANCED, sink.getInput().getGlobalProperties().getPartitioning());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be force rebalanced.",
+			PartitioningProperty.FORCED_REBALANCED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be force rebalanced
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be force rebalanced",
+				PartitioningProperty.FORCED_REBALANCED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be rebalancing",
+				ShipStrategyType.PARTITION_FORCED_REBALANCE, c.getShipStrategy());
+			assertTrue("Union input should be data source",
+				c.getSource() instanceof SourcePlanNode);
+		}
+	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by PARTITION_RANGE is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> PartitionByRange -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the range partitioning must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithRangePartitioning() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.partitionByRange(1).output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be range partitioned.",
+			PartitioningProperty.RANGE_PARTITIONED, sink.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Sink input should be range partitioned on 1",
+			new Ordering(1, null, Order.ASCENDING), sink.getInput().getGlobalProperties().getPartitioningOrdering());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be range partitioned.",
+			PartitioningProperty.RANGE_PARTITIONED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input should be range partitioned on 1",
+			new Ordering(1, null, Order.ASCENDING), partitioner.getInput().getGlobalProperties().getPartitioningOrdering());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be force rebalanced
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be force rebalanced",
+				PartitioningProperty.RANGE_PARTITIONED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be rebalancing",
+				ShipStrategyType.FORWARD, c.getShipStrategy());
+			// range partitioning is executed as custom partitioning with prior sampling
+			SingleInputPlanNode partitionMap = (SingleInputPlanNode)c.getSource();
+			assertEquals(DriverStrategy.MAP, partitionMap.getDriverStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionMap.getInput().getShipStrategy());
+		}
+	}
+
 }


[3/3] flink git commit: [hotfix] [streaming] Fix type extraction for joined streams.

Posted by fh...@apache.org.
[hotfix] [streaming] Fix type extraction for joined streams.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/424fb24c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/424fb24c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/424fb24c

Branch: refs/heads/release-1.1
Commit: 424fb24c3febc2f16004df77c470c01f7ea2337d
Parents: 074b4e6
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 4 16:15:26 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 21:08:58 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/datastream/JoinedStreams.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/424fb24c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index 86c6226..a3a4beb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -259,7 +259,7 @@ public class JoinedStreams<T1, T2> {
 		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
 			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
 					function,
-					JoinFunction.class,
+					FlatJoinFunction.class,
 					true,
 					true,
 					input1.getType(),


[2/3] flink git commit: [hotfix][docs] Stream joins don't support tuple position keys

Posted by fh...@apache.org.
[hotfix][docs] Stream joins don't support tuple position keys


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/074b4e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/074b4e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/074b4e60

Branch: refs/heads/release-1.1
Commit: 074b4e60ae0ca6b9e2f0dc064ec6cbaf5543b199
Parents: efbd293
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 4 15:41:06 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 21:08:29 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/index.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/074b4e60/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index f6fbbd5..806ae4b 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -409,7 +409,7 @@ dataStream.union(otherStream1, otherStream2, ...);
             <p>Join two data streams on a given key and a common window.</p>
     {% highlight java %}
 dataStream.join(otherStream)
-    .where(0).equalTo(1)
+    .where(<key selector>).equalTo(<key selector>)
     .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply (new JoinFunction () {...});
     {% endhighlight %}
@@ -758,7 +758,7 @@ dataStream.union(otherStream1, otherStream2, ...)
             <p>Join two data streams on a given key and a common window.</p>
     {% highlight scala %}
 dataStream.join(otherStream)
-    .where(0).equalTo(1)
+    .where(<key selector>).equalTo(<key selector>)
     .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply { ... }
     {% endhighlight %}