You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/18 04:48:44 UTC
kafka git commit: KAFKA-4275: Check of State-Store-assignment to
Processor-Nodes is not enabled
Repository: kafka
Updated Branches:
refs/heads/trunk 4e0b0b83a -> 925310aac
KAFKA-4275: Check of State-Store-assignment to Processor-Nodes is not enabled
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Guozhang Wang
Closes #1992 from mjsax/kafka-4275-stateStoreCheck
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/925310aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/925310aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/925310aa
Branch: refs/heads/trunk
Commit: 925310aac0b6a0fb32e3e2d614198ffc78f34f96
Parents: 4e0b0b8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 17 21:48:40 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 17 21:48:40 2016 -0700
----------------------------------------------------------------------
...ractKTableKTableJoinValueGetterSupplier.java | 45 +++++++++++++
.../kstream/internals/KStreamAggregate.java | 4 ++
.../streams/kstream/internals/KStreamImpl.java | 5 +-
.../kstream/internals/KStreamReduce.java | 4 ++
.../internals/KStreamWindowAggregate.java | 6 +-
.../kstream/internals/KStreamWindowReduce.java | 4 ++
.../kstream/internals/KTableAggregate.java | 4 ++
.../streams/kstream/internals/KTableFilter.java | 4 ++
.../streams/kstream/internals/KTableImpl.java | 8 ++-
.../kstream/internals/KTableKTableJoin.java | 15 +++--
.../kstream/internals/KTableKTableLeftJoin.java | 16 +++--
.../internals/KTableKTableOuterJoin.java | 21 ++++---
.../internals/KTableKTableRightJoin.java | 15 +++--
.../kstream/internals/KTableMapValues.java | 4 ++
.../streams/kstream/internals/KTableReduce.java | 4 ++
.../kstream/internals/KTableRepartitionMap.java | 5 ++
.../KTableSourceValueGetterSupplier.java | 5 ++
.../internals/KTableValueGetterSupplier.java | 1 +
.../internals/ProcessorContextImpl.java | 10 +--
.../streams/processor/TopologyBuilderTest.java | 66 ++++++++++++++++++++
20 files changed, 214 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
new file mode 100644
index 0000000..fa6d2aa
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.ArrayList;
+
+public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> {
+ final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+ final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+
+ public AbstractKTableKTableJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
+ final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ this.valueGetterSupplier1 = valueGetterSupplier1;
+ this.valueGetterSupplier2 = valueGetterSupplier2;
+ }
+
+ @Override
+ public String[] storeNames() {
+ final String[] storeNames1 = valueGetterSupplier1.storeNames();
+ final String[] storeNames2 = valueGetterSupplier2.storeNames();
+ final ArrayList<String> stores = new ArrayList<>(storeNames1.length + storeNames2.length);
+ for (final String storeName : storeNames1) {
+ stores.add(storeName);
+ }
+ for (final String storeName : storeNames2) {
+ stores.add(storeName);
+ }
+ return stores.toArray(new String[stores.size()]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 428c513..d596d5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -94,6 +94,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
return new KStreamAggregateValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bf345e1..b6c3401 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -584,6 +584,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String name = topology.newName(LEFTJOIN_NAME);
topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+ topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames());
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
@@ -703,8 +704,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
- topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+ topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
+ topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 6d24284..1408169 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -93,6 +93,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
return new KStreamReduceValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 437d304..718e52b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
@@ -134,6 +134,10 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
return new KStreamWindowAggregateValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 2a47f72..0b93468 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -130,6 +130,10 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
return new KStreamWindowReduceValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 3f2ab97..2ef4709 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -105,6 +105,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
return new KTableAggregateValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index ff0c67f..059a36f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -52,6 +52,10 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
return new KTableFilterValueGetter(parentValueGetterSupplier.get());
}
+ @Override
+ public String[] storeNames() {
+ return parentValueGetterSupplier.storeNames();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ebe00d8..c53e761 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.io.FileNotFoundException;
@@ -301,6 +301,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+ topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
+ topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@@ -327,6 +329,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+ topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
+ topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@@ -352,6 +356,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+ topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
+ topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 36424d1..cbd626d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -36,13 +36,18 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new KTableValueGetterSupplier<K, R>() {
+ return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+ }
- public KTableValueGetter<K, R> get() {
- return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
- }
+ private class KTableKTableJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
- };
+ public KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ super(valueGetterSupplier1, valueGetterSupplier2);
+ }
+
+ public KTableValueGetter<K, R> get() {
+ return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
}
private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 996ebc3..4bee38c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -36,15 +36,21 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new KTableValueGetterSupplier<K, R>() {
+ return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+ }
- public KTableValueGetter<K, R> get() {
- return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
- }
+ private class KTableKTableLeftJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+
+ public KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ super(valueGetterSupplier1, valueGetterSupplier2);
+ }
- };
+ public KTableValueGetter<K, R> get() {
+ return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
}
+
private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 2a0d477..ad7dbde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,13 +36,18 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new KTableValueGetterSupplier<K, R>() {
+ return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+ }
- public KTableValueGetter<K, R> get() {
- return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
- }
+ private class KTableKTableOuterJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
- };
+ public KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ super(valueGetterSupplier1, valueGetterSupplier2);
+ }
+
+ public KTableValueGetter<K, R> get() {
+ return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
}
private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index fa41ed3..80aadaa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -37,13 +37,18 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new KTableValueGetterSupplier<K, R>() {
+ return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+ }
- public KTableValueGetter<K, R> get() {
- return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
- }
+ private class KTableKTableRightJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
- };
+ public KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ super(valueGetterSupplier1, valueGetterSupplier2);
+ }
+
+ public KTableValueGetter<K, R> get() {
+ return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
}
private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 244d8ba..daabb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -50,6 +50,10 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
}
+ @Override
+ public String[] storeNames() {
+ return parentValueGetterSupplier.storeNames();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index a5457a5..8c2e5f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -103,6 +103,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
return new KTableAggregateValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 939a1df..42bb13a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -53,6 +53,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
public KTableValueGetter<K, KeyValue<K1, V1>> get() {
return new KTableMapValueGetter(parentValueGetterSupplier.get());
}
+
+ @Override
+ public String[] storeNames() {
+ throw new StreamsException("Underlying state store not accessible due to repartitioning.");
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index fe41fa0..e59a3eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -32,6 +32,11 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
return new KTableSourceValueGetter();
}
+ @Override
+ public String[] storeNames() {
+ return new String[]{storeName};
+ }
+
private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
KeyValueStore<K, V> store = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
index 1ab6ba6..2423bf0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -21,4 +21,5 @@ public interface KTableValueGetterSupplier<K, V> {
KTableValueGetter<K, V> get();
+ String[] storeNames();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f4d4e83..195e5a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,11 +18,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -133,9 +133,9 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
if (node == null)
throw new TopologyBuilderException("Accessing from an unknown node");
- // TODO: restore this once we fix the ValueGetter initialization issue
- //if (!node.stateStores.contains(name))
- // throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
+ if (!node.stateStores.contains(name)) {
+ throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
+ }
return stateMgr.getStore(name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/925310aa/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7fe5170..3f45967 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -24,9 +26,11 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.Test;
import java.util.Arrays;
@@ -517,4 +521,66 @@ public class TopologyBuilderTest {
assertEquals(1, properties.size());
}
+ @Test(expected = TopologyBuilderException.class)
+ public void shouldThroughOnUnassignedStateStoreAccess() {
+ final String sourceNodeName = "source";
+ final String goodNodeName = "goodGuy";
+ final String badNodeName = "badGuy";
+
+ final Properties config = new Properties();
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+ final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+ try {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder
+ .addSource(sourceNodeName, "topic")
+ .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+ .addStateStore(
+ Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+ goodNodeName)
+ .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
+ final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME);
+ driver.process("topic", null, null);
+
+ } catch (final StreamsException e) {
+ final Throwable cause = e.getCause();
+ if (cause != null
+ && cause instanceof TopologyBuilderException
+ && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+ throw (TopologyBuilderException) cause;
+ } else {
+ throw new RuntimeException("Did expect different exception. Did catch:", e);
+ }
+ }
+ }
+
+ private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+ final static String STORE_NAME = "store";
+
+ @Override
+ public Processor get() {
+ return new Processor() {
+ @Override
+ public void init(ProcessorContext context) {
+ context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(Object key, Object value) {
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ }
+
}