You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:51 UTC
[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in
operator.
[hotfix] [cep] Remove unused keySelector in operator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ebcb07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ebcb07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ebcb07
Branch: refs/heads/master
Commit: f7ebcb07edecc06f523e4f46dfaa3ec10d1e90c8
Parents: 8e4db42
Author: kkloudas <kk...@gmail.com>
Authored: Mon May 15 14:49:00 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Wed May 17 14:37:33 2017 +0200
----------------------------------------------------------------------
.../cep/operator/AbstractKeyedCEPPatternOperator.java | 6 ------
.../org/apache/flink/cep/operator/CEPOperatorUtils.java | 6 ------
.../flink/cep/operator/KeyedCEPPatternOperator.java | 4 +---
.../cep/operator/TimeoutKeyedCEPPatternOperator.java | 4 +---
.../flink/cep/operator/CEPFrom12MigrationTest.java | 6 ------
.../flink/cep/operator/CEPMigration11to13Test.java | 2 --
.../org/apache/flink/cep/operator/CEPOperatorTest.java | 12 ++++--------
.../org/apache/flink/cep/operator/CEPRescalingTest.java | 1 -
8 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7068bc4..bac21b3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
private final TypeSerializer<IN> inputSerializer;
- // necessary to extract the key from the input elements
- private final KeySelector<IN, KEY> keySelector;
-
// necessary to serialize the set of seen keys
private final TypeSerializer<KEY> keySerializer;
@@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
public AbstractKeyedCEPPatternOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
- final KeySelector<IN, KEY> keySelector,
final TypeSerializer<KEY> keySerializer,
final NFACompiler.NFAFactory<IN> nfaFactory,
final boolean migratingFromOldKeyedOperator) {
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
- this.keySelector = Preconditions.checkNotNull(keySelector);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 08424a4..e7b7e65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -63,7 +63,6 @@ public class CEPOperatorUtils {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
- KeySelector<T, K> keySelector = keyedStream.getKeySelector();
TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
patternStream = keyedStream.transform(
@@ -72,7 +71,6 @@ public class CEPOperatorUtils {
new KeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
- keySelector,
keySerializer,
nfaFactory,
true));
@@ -87,7 +85,6 @@ public class CEPOperatorUtils {
new KeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
- keySelector,
keySerializer,
nfaFactory,
false
@@ -127,7 +124,6 @@ public class CEPOperatorUtils {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
- KeySelector<T, K> keySelector = keyedStream.getKeySelector();
TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
patternStream = keyedStream.transform(
@@ -136,7 +132,6 @@ public class CEPOperatorUtils {
new TimeoutKeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
- keySelector,
keySerializer,
nfaFactory,
true));
@@ -151,7 +146,6 @@ public class CEPOperatorUtils {
new TimeoutKeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
- keySelector,
keySerializer,
nfaFactory,
false
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d68afb..fec226a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.operator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
public KeyedCEPPatternOperator(
TypeSerializer<IN> inputSerializer,
boolean isProcessingTime,
- KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 9061bcb..5238878 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.operator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
public TimeoutKeyedCEPPatternOperator(
TypeSerializer<IN> inputSerializer,
boolean isProcessingTime,
- KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index b0f47cc..d9efb1b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -78,7 +78,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true),
@@ -126,7 +125,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true),
@@ -200,7 +198,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true),
@@ -246,7 +243,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true),
@@ -332,7 +328,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new SinglePatternNFAFactory(),
true),
@@ -371,7 +366,6 @@ public class CEPFrom12MigrationTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new SinglePatternNFAFactory(),
true),
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 8a97448..88a5703 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,7 +103,6 @@ public class CEPMigration11to13Test {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true),
@@ -178,7 +177,6 @@ public class CEPMigration11to13Test {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
ByteSerializer.INSTANCE,
new NFAFactory(),
false),
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 436ad52..eb50dfd 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -228,7 +228,6 @@ public class CEPOperatorTest extends TestLogger {
new TimeoutKeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(true),
true),
@@ -297,7 +296,7 @@ public class CEPOperatorTest extends TestLogger {
Event startEventK2 = new Event(43, "start", 1.0);
TestKeySelector keySelector = new TestKeySelector();
- KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false, keySelector);
+ KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
harness.open();
@@ -384,7 +383,6 @@ public class CEPOperatorTest extends TestLogger {
KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
IntSerializer.INSTANCE,
new ComplexNFAFactory(),
true);
@@ -475,7 +473,7 @@ public class CEPOperatorTest extends TestLogger {
Event startEventK2 = new Event(43, "start", 1.0);
TestKeySelector keySelector = new TestKeySelector();
- KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true, keySelector);
+ KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
harness.open();
@@ -555,7 +553,7 @@ public class CEPOperatorTest extends TestLogger {
KeySelector<Event, Integer> keySelector = new TestKeySelector();
return new KeyedOneInputStreamOperatorTestHarness<>(
- getKeyedCepOpearator(isProcessingTime, keySelector),
+ getKeyedCepOpearator(isProcessingTime),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
}
@@ -571,13 +569,11 @@ public class CEPOperatorTest extends TestLogger {
}
private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOpearator(
- boolean isProcessingTime,
- KeySelector<Event, Integer> keySelector) {
+ boolean isProcessingTime) {
return new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
isProcessingTime,
- keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
true);
http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 9eb8da2..45d7215 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -344,7 +344,6 @@ public class CEPRescalingTest {
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
- keySelector,
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
new NFAFactory(),
true),