You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:33 UTC
[5/6] flink git commit: [FLINK-4957] Remove Key Serializer Parameter
getInternalTimerService()
[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()
It's not needed because we can get the key serializer from the keyed
state backend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3
Branch: refs/heads/master
Commit: 0b873ac343343fd1d7716d075b54e66324374f47
Parents: 06fb9f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:34:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 2 +-
.../api/operators/AbstractStreamOperator.java | 13 ++++++-------
.../api/operators/StreamTimelyFlatMap.java | 9 ++-------
.../api/operators/co/CoStreamTimelyFlatMap.java | 11 ++---------
.../operators/windowing/WindowOperator.java | 2 +-
.../api/operators/AbstractStreamOperatorTest.java | 1 -
.../api/operators/TimelyFlatMapTest.java | 18 +++++++++---------
.../api/operators/co/TimelyCoFlatMapTest.java | 16 ++++++++--------
8 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c938f5b..4063b60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
TypeInformation<R> outputType) {
StreamTimelyFlatMap<KEY, T, R> operator =
- new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
+ new StreamTimelyFlatMap<>(clean(flatMapper));
return transform("Flat Map", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7b555b7..839abf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT>
*
* @param name The name of the requested timer service. If no service exists under the given
* name a new one will be created and returned.
- * @param keySerializer {@code TypeSerializer} for the keys of the timers.
* @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
* @param triggerable The {@link Triggerable} that should be invoked when timers fire
*
- * @param <K> The type of the timer keys.
* @param <N> The type of the timer namespace.
*/
- public <K, N> InternalTimerService<N> getInternalTimerService(
+ public <N> InternalTimerService<N> getInternalTimerService(
String name,
- TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
- Triggerable<K, N> triggerable) {
+ Triggerable<?, N> triggerable) {
if (getKeyedStateBackend() == null) {
throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
}
@SuppressWarnings("unchecked")
- HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
+ HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name);
if (timerService == null) {
timerService = new HeapInternalTimerService<>(
@@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT>
getRuntimeContext().getProcessingTimeService());
timerServices.put(name, timerService);
}
- timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Triggerable rawTriggerable = (Triggerable) triggerable;
+ timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable);
return timerService;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index 962f264..d507ba6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
@@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT>
private static final long serialVersionUID = 1L;
- private final TypeSerializer<K> keySerializer;
-
private transient TimestampedCollector<OUT> collector;
private transient TimerService timerService;
- public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) {
+ public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
- this.keySerializer = keySerializer;
-
chainingStrategy = ChainingStrategy.ALWAYS;
}
@@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT>
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
this.timerService = new SimpleTimerService(internalTimerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index df2320f..212aafd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
@@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
private static final long serialVersionUID = 1L;
- private final TypeSerializer<K> keySerializer;
-
private transient TimestampedCollector<OUT> collector;
private transient TimerService timerService;
- public CoStreamTimelyFlatMap(
- TypeSerializer<K> keySerializer,
- TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+ public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
-
- this.keySerializer = keySerializer;
}
@Override
@@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
this.timerService = new SimpleTimerService(internalTimerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c465767..229d97d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
timestampedCollector = new TimestampedCollector<>(output);
internalTimerService =
- getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
+ getInternalTimerService("window-timers", windowSerializer, this);
context = new Context(null, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index fd05353..2fb0089 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest {
this.timerService = getInternalTimerService(
"test-timers",
- IntSerializer.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index f3b09eb..46b52ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testCurrentEventTime() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testCurrentProcessingTime() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testEventTimeTimers() throws Exception {
StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testProcessingTimeTimers() throws Exception {
StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testEventTimeTimerWithState() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testProcessingTimeTimerWithState() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testSnapshotAndRestore() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger {
testHarness.close();
- operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger {
private static final long serialVersionUID = 1L;
private final ValueStateDescriptor<Integer> state =
- new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+ new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
private final TimeDomain timeDomain;
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index 25808f4..cb5d6c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testCurrentEventTime() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testCurrentProcessingTime() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testEventTimeTimers() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testProcessingTimeTimers() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testEventTimeTimerWithState() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testProcessingTimeTimerWithState() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testSnapshotAndRestore() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
testHarness.close();
- operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
operator,