You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:51 UTC

[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in operator.

[hotfix] [cep] Remove unused keySelector in operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ebcb07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ebcb07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ebcb07

Branch: refs/heads/master
Commit: f7ebcb07edecc06f523e4f46dfaa3ec10d1e90c8
Parents: 8e4db42
Author: kkloudas <kk...@gmail.com>
Authored: Mon May 15 14:49:00 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Wed May 17 14:37:33 2017 +0200

----------------------------------------------------------------------
 .../cep/operator/AbstractKeyedCEPPatternOperator.java   |  6 ------
 .../org/apache/flink/cep/operator/CEPOperatorUtils.java |  6 ------
 .../flink/cep/operator/KeyedCEPPatternOperator.java     |  4 +---
 .../cep/operator/TimeoutKeyedCEPPatternOperator.java    |  4 +---
 .../flink/cep/operator/CEPFrom12MigrationTest.java      |  6 ------
 .../flink/cep/operator/CEPMigration11to13Test.java      |  2 --
 .../org/apache/flink/cep/operator/CEPOperatorTest.java  | 12 ++++--------
 .../org/apache/flink/cep/operator/CEPRescalingTest.java |  1 -
 8 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7068bc4..bac21b3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 
 	private final TypeSerializer<IN> inputSerializer;
 
-	// necessary to extract the key from the input elements
-	private final KeySelector<IN, KEY> keySelector;
-
 	// necessary to serialize the set of seen keys
 	private final TypeSerializer<KEY> keySerializer;
 
@@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	public AbstractKeyedCEPPatternOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
-			final KeySelector<IN, KEY> keySelector,
 			final TypeSerializer<KEY> keySerializer,
 			final NFACompiler.NFAFactory<IN> nfaFactory,
 			final boolean migratingFromOldKeyedOperator) {
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
 		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
-		this.keySelector = Preconditions.checkNotNull(keySelector);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 08424a4..e7b7e65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -63,7 +63,6 @@ public class CEPOperatorUtils {
 			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
 			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
 
-			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
 			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
 			patternStream = keyedStream.transform(
@@ -72,7 +71,6 @@ public class CEPOperatorUtils {
 				new KeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
-					keySelector,
 					keySerializer,
 					nfaFactory,
 					true));
@@ -87,7 +85,6 @@ public class CEPOperatorUtils {
 				new KeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
-					keySelector,
 					keySerializer,
 					nfaFactory,
 					false
@@ -127,7 +124,6 @@ public class CEPOperatorUtils {
 			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
 			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
 
-			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
 			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
 			patternStream = keyedStream.transform(
@@ -136,7 +132,6 @@ public class CEPOperatorUtils {
 				new TimeoutKeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
-					keySelector,
 					keySerializer,
 					nfaFactory,
 					true));
@@ -151,7 +146,6 @@ public class CEPOperatorUtils {
 				new TimeoutKeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
-					keySelector,
 					keySerializer,
 					nfaFactory,
 					false

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d68afb..fec226a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 	public KeyedCEPPatternOperator(
 			TypeSerializer<IN> inputSerializer,
 			boolean isProcessingTime,
-			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 9061bcb..5238878 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 	public TimeoutKeyedCEPPatternOperator(
 			TypeSerializer<IN> inputSerializer,
 			boolean isProcessingTime,
-			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index b0f47cc..d9efb1b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -78,7 +78,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
 								true),
@@ -126,7 +125,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
 								true),
@@ -200,7 +198,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
 								true),
@@ -246,7 +243,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
 								true),
@@ -332,7 +328,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new SinglePatternNFAFactory(),
 								true),
@@ -371,7 +366,6 @@ public class CEPFrom12MigrationTest {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new SinglePatternNFAFactory(),
 								true),

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 8a97448..88a5703 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,7 +103,6 @@ public class CEPMigration11to13Test {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
 								true),
@@ -178,7 +177,6 @@ public class CEPMigration11to13Test {
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
-								keySelector,
 								ByteSerializer.INSTANCE,
 								new NFAFactory(),
 								false),

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 436ad52..eb50dfd 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -228,7 +228,6 @@ public class CEPOperatorTest extends TestLogger {
 			new TimeoutKeyedCEPPatternOperator<>(
 				Event.createTypeSerializer(),
 				false,
-				keySelector,
 				IntSerializer.INSTANCE,
 				new NFAFactory(true),
 				true),
@@ -297,7 +296,7 @@ public class CEPOperatorTest extends TestLogger {
 		Event startEventK2 = new Event(43, "start", 1.0);
 
 		TestKeySelector keySelector = new TestKeySelector();
-		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false, keySelector);
+		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
 		harness.open();
@@ -384,7 +383,6 @@ public class CEPOperatorTest extends TestLogger {
 		KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
 				Event.createTypeSerializer(),
 				false,
-				keySelector,
 				IntSerializer.INSTANCE,
 				new ComplexNFAFactory(),
 				true);
@@ -475,7 +473,7 @@ public class CEPOperatorTest extends TestLogger {
 		Event startEventK2 = new Event(43, "start", 1.0);
 
 		TestKeySelector keySelector = new TestKeySelector();
-		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true, keySelector);
+		KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
 
 		harness.open();
@@ -555,7 +553,7 @@ public class CEPOperatorTest extends TestLogger {
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
-			getKeyedCepOpearator(isProcessingTime, keySelector),
+			getKeyedCepOpearator(isProcessingTime),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO);
 	}
@@ -571,13 +569,11 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOpearator(
-			boolean isProcessingTime,
-			KeySelector<Event, Integer> keySelector) {
+			boolean isProcessingTime) {
 
 		return new KeyedCEPPatternOperator<>(
 			Event.createTypeSerializer(),
 			isProcessingTime,
-			keySelector,
 			IntSerializer.INSTANCE,
 			new NFAFactory(),
 			true);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 9eb8da2..45d7215 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -344,7 +344,6 @@ public class CEPRescalingTest {
 			new KeyedCEPPatternOperator<>(
 				Event.createTypeSerializer(),
 				false,
-				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				new NFAFactory(),
 				true),