You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/06 15:52:10 UTC

[2/5] flink git commit: [FLINK-4460] Make CoProcessFunction abstract, add default onTime() method

[FLINK-4460] Make CoProcessFunction abstract, add default onTime() method

This is in preparation of allowing CoProcessFunction on a non-keyed
connected stream.  we will use it to allow side outputs from the
ProcessFunction Context.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e12f3203
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e12f3203
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e12f3203

Branch: refs/heads/master
Commit: e12f3203eda7f071daeea7c59ca32b3db7763b68
Parents: 0228676
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 11:33:03 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        |  9 -----
 .../api/functions/co/CoProcessFunction.java     | 24 ++++++------
 .../api/functions/co/RichCoProcessFunction.java | 41 --------------------
 .../api/operators/co/CoProcessOperator.java     | 23 +++++------
 .../api/operators/co/CoProcessOperatorTest.java | 15 ++++---
 .../streaming/api/scala/ConnectedStreams.scala  |  2 +-
 6 files changed, 32 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 96a08d3..66601a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
@@ -243,10 +242,6 @@ public class ConnectedStreams<IN1, IN2> {
 	 * function can also query the time and set timers. When reacting to the firing of set timers
 	 * the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichCoProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
@@ -274,10 +269,6 @@ public class ConnectedStreams<IN1, IN2> {
 	 * this function can also query the time and set timers. When reacting to the firing of set
 	 * timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichCoProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index feff8fb..8811e32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -19,13 +19,11 @@
 package org.apache.flink.streaming.api.functions.co;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
-import java.io.Serializable;
-
 /**
  * A function that processes elements of two streams and produces a single output one.
  *
@@ -46,7 +44,9 @@ import java.io.Serializable;
  * @param <OUT> Output type.
  */
 @PublicEvolving
-public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * This method is called for each element in the first of the connected streams.
@@ -63,7 +63,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+	public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
 
 	/**
 	 * This method is called for each element in the second of the connected streams.
@@ -80,7 +80,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+	public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
 
 	/**
 	 * Called when a timer set using {@link TimerService} fires.
@@ -95,14 +95,14 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+	public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
 
 	/**
 	 * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
 	 * {@link #processElement2(Object, Context, Collector)}
 	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface Context {
+	public abstract class Context {
 
 		/**
 		 * Timestamp of the element currently being processed or timestamp of a firing timer.
@@ -110,21 +110,21 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 		 * <p>This might be {@code null}, for example if the time characteristic of your program
 		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
 		 */
-		Long timestamp();
+		public abstract Long timestamp();
 
 		/**
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
-		TimerService timerService();
+		public abstract TimerService timerService();
 	}
 
 	/**
 	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface OnTimerContext extends Context {
+	public abstract class OnTimerContext extends Context {
 		/**
 		 * The {@link TimeDomain} of the firing timer.
 		 */
-		TimeDomain timeDomain();
+		public abstract TimeDomain timeDomain();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
deleted file mode 100644
index 0fcea91..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichCoProcessFunction<IN1, IN2, OUT>
-		extends AbstractRichFunction
-		implements CoProcessFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index e6c3d3f..ed99815 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -44,11 +44,9 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient TimerService timerService;
+	private transient ContextImpl<IN1, IN2, OUT> context;
 
-	private transient ContextImpl context;
-
-	private transient OnTimerContextImpl onTimerContext;
+	private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
 
 	public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
@@ -62,10 +60,10 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		InternalTimerService<VoidNamespace> internalTimerService =
 				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
-		this.timerService = new SimpleTimerService(internalTimerService);
+		TimerService timerService = new SimpleTimerService(internalTimerService);
 
-		context = new ContextImpl(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
+		context = new ContextImpl<>(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
 	}
 
 	@Override
@@ -108,13 +106,14 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		return collector;
 	}
 
-	private static class ContextImpl implements CoProcessFunction.Context {
+	private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
 
 		private final TimerService timerService;
 
 		private StreamRecord<?> element;
 
-		ContextImpl(TimerService timerService) {
+		ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 
@@ -135,7 +134,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		}
 	}
 
-	private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+	private static class OnTimerContextImpl<IN1, IN2, OUT>
+			extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
 
 		private final TimerService timerService;
 
@@ -143,7 +143,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 
 		private InternalTimer<?, VoidNamespace> timer;
 
-		OnTimerContextImpl(TimerService timerService) {
+		OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index eea428f..9d7c444 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -344,7 +343,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -366,7 +365,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -393,7 +392,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+	private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -425,7 +424,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -452,7 +451,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -474,7 +473,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -506,7 +505,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index a7325a4..cebda5d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector