You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/07 01:13:19 UTC
[1/2] beam git commit: Revise MapState and SetState APIs to leverage
ReadableState
Repository: beam
Updated Branches:
refs/heads/master b92032ff6 -> a8edbb81f
Revise MapState and SetState APIs to leverage ReadableState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/604be670
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/604be670
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/604be670
Branch: refs/heads/master
Commit: 604be6703daadfcf085d69ee2859577218d6b3d4
Parents: b92032f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Mar 28 20:08:45 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:36:09 2017 -0700
----------------------------------------------------------------------
.../runners/core/InMemoryStateInternals.java | 84 ++++---------------
.../core/InMemoryStateInternalsTest.java | 87 +++++++++-----------
.../CopyOnAccessInMemoryStateInternalsTest.java | 16 ++--
.../apache/beam/sdk/util/state/MapState.java | 52 +++++-------
.../beam/sdk/util/state/ReadableStates.java | 45 ++++++++++
.../apache/beam/sdk/util/state/SetState.java | 34 +-------
.../apache/beam/sdk/transforms/ParDoTest.java | 6 +-
7 files changed, 134 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 0d5b058..55b7fc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.core;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -42,6 +40,7 @@ import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ReadableStates;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateContext;
@@ -468,13 +467,15 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public boolean contains(T t) {
- return contents.contains(t);
+ public ReadableState<Boolean> contains(T t) {
+ return ReadableStates.immediate(contents.contains(t));
}
@Override
- public boolean addIfAbsent(T t) {
- return contents.add(t);
+ public ReadableState<Boolean> addIfAbsent(T t) {
+ boolean alreadyContained = contents.contains(t);
+ contents.add(t);
+ return ReadableStates.immediate(!alreadyContained);
}
@Override
@@ -483,33 +484,6 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public SetState<T> readLater(Iterable<T> elements) {
- return this;
- }
-
- @Override
- public boolean containsAny(Iterable<T> elements) {
- elements = checkNotNull(elements);
- for (T t : elements) {
- if (contents.contains(t)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public boolean containsAll(Iterable<T> elements) {
- elements = checkNotNull(elements);
- for (T t : elements) {
- if (!contents.contains(t)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
public InMemorySet<T> readLater() {
return this;
}
@@ -565,8 +539,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public V get(K key) {
- return contents.get(key);
+ public ReadableState<V> get(K key) {
+ return ReadableStates.immediate(contents.get(key));
}
@Override
@@ -575,13 +549,13 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public V putIfAbsent(K key, V value) {
+ public ReadableState<V> putIfAbsent(K key, V value) {
V v = contents.get(key);
if (v == null) {
v = contents.put(key, value);
}
- return v;
+ return ReadableStates.immediate(v);
}
@Override
@@ -590,42 +564,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public Iterable<V> get(Iterable<K> keys) {
- List<V> values = new ArrayList<>();
- for (K k : keys) {
- values.add(contents.get(k));
- }
- return values;
+ public ReadableState<Iterable<K>> keys() {
+ return ReadableStates.immediate((Iterable<K>) contents.keySet());
}
@Override
- public MapState<K, V> getLater(K k) {
- return this;
- }
-
- @Override
- public MapState<K, V> getLater(Iterable<K> keys) {
- return this;
- }
-
- @Override
- public Iterable<K> keys() {
- return contents.keySet();
- }
-
- @Override
- public Iterable<V> values() {
- return contents.values();
- }
-
- @Override
- public MapState<K, V> iterateLater() {
- return this;
+ public ReadableState<Iterable<V>> values() {
+ return ReadableStates.immediate((Iterable<V>) contents.values());
}
@Override
- public Iterable<Map.Entry<K, V>> iterate() {
- return contents.entrySet();
+ public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+ return ReadableStates.immediate((Iterable<Map.Entry<K, V>>) contents.entrySet());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index e4fb5c1..34ddae6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.runners.core;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -26,7 +28,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -112,10 +113,10 @@ public class InMemoryStateInternalsTest {
assertThat(value.read(), Matchers.emptyIterable());
value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+ assertThat(value.read(), containsInAnyOrder("hello"));
value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+ assertThat(value.read(), containsInAnyOrder("hello", "world"));
value.clear();
assertThat(value.read(), Matchers.emptyIterable());
@@ -147,7 +148,7 @@ public class InMemoryStateInternalsTest {
StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
// Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(bag2.read(), Matchers.emptyIterable());
}
@@ -164,7 +165,7 @@ public class InMemoryStateInternalsTest {
StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
// Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(bag1.read(), Matchers.emptyIterable());
assertThat(bag2.read(), Matchers.emptyIterable());
}
@@ -179,41 +180,32 @@ public class InMemoryStateInternalsTest {
// empty
assertThat(value.read(), Matchers.emptyIterable());
- assertFalse(value.contains("A"));
- assertFalse(value.containsAny(Collections.singletonList("A")));
+ assertFalse(value.contains("A").read());
// add
value.add("A");
value.add("B");
value.add("A");
- assertFalse(value.addIfAbsent("B"));
- assertThat(value.read(), Matchers.containsInAnyOrder("A", "B"));
+ assertFalse(value.addIfAbsent("B").read());
+ assertThat(value.read(), containsInAnyOrder("A", "B"));
// remove
value.remove("A");
- assertThat(value.read(), Matchers.containsInAnyOrder("B"));
+ assertThat(value.read(), containsInAnyOrder("B"));
value.remove("C");
- assertThat(value.read(), Matchers.containsInAnyOrder("B"));
+ assertThat(value.read(), containsInAnyOrder("B"));
// contains
- assertFalse(value.contains("A"));
- assertTrue(value.contains("B"));
+ assertFalse(value.contains("A").read());
+ assertTrue(value.contains("B").read());
value.add("C");
value.add("D");
- // containsAny
- assertTrue(value.containsAny(Arrays.asList("A", "C")));
- assertFalse(value.containsAny(Arrays.asList("A", "E")));
-
- // containsAll
- assertTrue(value.containsAll(Arrays.asList("B", "C")));
- assertFalse(value.containsAll(Arrays.asList("A", "B")));
-
// readLater
- assertThat(value.readLater().read(), Matchers.containsInAnyOrder("B", "C", "D"));
- SetState<String> later = value.readLater(Arrays.asList("A", "C", "D"));
- assertTrue(later.containsAll(Arrays.asList("C", "D")));
- assertFalse(later.contains("A"));
+ assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
+ SetState<String> later = value.readLater();
+ assertThat(later.read(), hasItems("C", "D"));
+ assertFalse(later.contains("A").read());
// clear
value.clear();
@@ -248,7 +240,7 @@ public class InMemoryStateInternalsTest {
StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
// Reading the merged set gets both the contents
- assertThat(set1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(set2.read(), Matchers.emptyIterable());
}
@@ -266,7 +258,7 @@ public class InMemoryStateInternalsTest {
StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
// Reading the merged set gets both the contents
- assertThat(set3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(set1.read(), Matchers.emptyIterable());
assertThat(set2.read(), Matchers.emptyIterable());
}
@@ -330,49 +322,46 @@ public class InMemoryStateInternalsTest {
assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
// put
- assertThat(value.iterate(), Matchers.emptyIterable());
+ assertThat(value.entries().read(), Matchers.emptyIterable());
value.put("A", 1);
value.put("B", 2);
value.put("A", 11);
- assertThat(value.putIfAbsent("B", 22), equalTo(2));
- assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("A", 11),
+ assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
MapEntry.of("B", 2)));
// remove
value.remove("A");
- assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2)));
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
value.remove("C");
- assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2)));
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
// get
- assertNull(value.get("A"));
- assertThat(value.get("B"), equalTo(2));
+ assertNull(value.get("A").read());
+ assertThat(value.get("B").read(), equalTo(2));
value.put("C", 3);
value.put("D", 4);
- assertThat(value.get("C"), equalTo(3));
- assertThat(value.get(Collections.singletonList("D")), Matchers.containsInAnyOrder(4));
- assertThat(value.get(Arrays.asList("B", "C")), Matchers.containsInAnyOrder(2, 3));
+ assertThat(value.get("C").read(), equalTo(3));
// iterate
value.put("E", 5);
value.remove("C");
- assertThat(value.keys(), Matchers.containsInAnyOrder("B", "D", "E"));
- assertThat(value.values(), Matchers.containsInAnyOrder(2, 4, 5));
- assertThat(value.iterate(), Matchers.containsInAnyOrder(
- MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+ assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
+ assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
+ assertThat(
+ value.entries().read(),
+ containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
// readLater
- assertThat(value.getLater("B").get("B"), equalTo(2));
- assertNull(value.getLater("A").get("A"));
- MapState<String, Integer> later = value.getLater(Arrays.asList("C", "D"));
- assertNull(later.get("C"));
- assertThat(later.get("D"), equalTo(4));
- assertThat(value.iterateLater().iterate(), Matchers.containsInAnyOrder(
- MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+ assertThat(value.get("B").readLater().read(), equalTo(2));
+ assertNull(value.get("A").readLater().read());
+ assertThat(
+ value.entries().readLater().read(),
+ containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
// clear
value.clear();
- assertThat(value.iterate(), Matchers.emptyIterable());
+ assertThat(value.entries().read(), Matchers.emptyIterable());
assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 142af32..68c6613 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -201,24 +201,24 @@ public class CopyOnAccessInMemoryStateInternalsTest {
StateTag<Object, MapState<String, Integer>> valueTag =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
- assertThat(underlyingValue.iterate(), emptyIterable());
+ assertThat(underlyingValue.entries().read(), emptyIterable());
underlyingValue.put("hello", 1);
- assertThat(underlyingValue.get("hello"), equalTo(1));
+ assertThat(underlyingValue.get("hello").read(), equalTo(1));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
- assertThat(copyOnAccessState.get("hello"), equalTo(1));
+ assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
copyOnAccessState.put("world", 4);
- assertThat(copyOnAccessState.get("hello"), equalTo(1));
- assertThat(copyOnAccessState.get("world"), equalTo(4));
- assertThat(underlyingValue.get("hello"), equalTo(1));
- assertNull(underlyingValue.get("world"));
+ assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
+ assertThat(copyOnAccessState.get("world").read(), equalTo(4));
+ assertThat(underlyingValue.get("hello").read(), equalTo(1));
+ assertNull(underlyingValue.get("world").read());
MapState<String, Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
- assertThat(underlyingValue.iterate(), equalTo(reReadUnderlyingValue.iterate()));
+ assertThat(underlyingValue.entries().read(), equalTo(reReadUnderlyingValue.entries().read()));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
index 85d99d6..fb7e807 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
@@ -30,21 +30,20 @@ import java.util.Map;
public interface MapState<K, V> extends State {
/**
- * Returns the value to which the specified key is mapped in the state.
- */
- V get(K key);
-
- /**
* Associates the specified value with the specified key in this state.
*/
void put(K key, V value);
/**
- * If the specified key is not already associated with a value (or is mapped
- * to {@code null}) associates it with the given value and returns
- * {@code null}, else returns the current value.
+ * A deferred read-followed-by-write.
+ *
+ * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
+ * map and reconciliation with any pending modifications.
+ *
+ * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
+ * associates it with the given value and returns {@code null}, else returns the current value.
*/
- V putIfAbsent(K key, V value);
+ ReadableState<V> putIfAbsent(K key, V value);
/**
* Removes the mapping for a key from this map if it is present.
@@ -52,42 +51,29 @@ public interface MapState<K, V> extends State {
void remove(K key);
/**
- * A bulk get.
- * @param keys the keys to search for
- * @return a iterable view of values, maybe some values is null.
- * The order of values corresponds to the order of the keys.
+ * A deferred lookup.
+ *
+ * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
+ * on the results.
+ *
+ * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
+ * all pending reads in a single batch.
*/
- Iterable<V> get(Iterable<K> keys);
-
- /**
- * Indicate that specified key will be read later.
- */
- MapState<K, V> getLater(K k);
-
- /**
- * Indicate that specified batch keys will be read later.
- */
- MapState<K, V> getLater(Iterable<K> keys);
+ ReadableState<V> get(K key);
/**
* Returns a iterable view of the keys contained in this map.
*/
- Iterable<K> keys();
+ ReadableState<Iterable<K>> keys();
/**
* Returns a iterable view of the values contained in this map.
*/
- Iterable<V> values();
-
- /**
- * Indicate that all key-values will be read later.
- */
- MapState<K, V> iterateLater();
+ ReadableState<Iterable<V>> values();
/**
* Returns a iterable view of all key-values.
*/
- Iterable<Map.Entry<K, V>> iterate();
-
+ ReadableState<Iterable<Map.Entry<K, V>>> entries();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
new file mode 100644
index 0000000..819eda6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Utilities for constructing and manipulating {@link ReadableState} instances.
+ */
+@Experimental(Kind.STATE)
+public class ReadableStates {
+
+ /**
+ * A {@link ReadableState} constructed from a constant value, hence immediately available.
+ */
+ public static <T> ReadableState<T> immediate(final T value) {
+ return new ReadableState<T>() {
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 5c907d5..56ea510 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -27,45 +27,19 @@ public interface SetState<T> extends GroupingState<T, Iterable<T>> {
/**
* Returns true if this set contains the specified element.
*/
- boolean contains(T t);
+ ReadableState<Boolean> contains(T t);
/**
- * Add a value to the buffer if it is not already present.
- * If this set already contains the element, the call leaves the set
- * unchanged and returns false.
+ * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
+ * false} otherwise.
*/
- boolean addIfAbsent(T t);
+ ReadableState<Boolean> addIfAbsent(T t);
/**
* Removes the specified element from this set if it is present.
*/
void remove(T t);
- /**
- * Indicate that elements will be read later.
- * @param elements to be read later
- * @return this for convenient chaining
- */
- SetState<T> readLater(Iterable<T> elements);
-
- /**
- * <p>Checks if SetState contains any given elements.</p>
- *
- * @param elements the elements to search for
- * @return the {@code true} if any of the elements are found,
- * {@code false} if no match
- */
- boolean containsAny(Iterable<T> elements);
-
- /**
- * <p>Checks if SetState contains all given elements.</p>
- *
- * @param elements the elements to find
- * @return true if the SetState contains all elements,
- * false if not
- */
- boolean containsAll(Iterable<T> elements);
-
@Override
SetState<T> readLater();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e305da1..b429eab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2228,7 +2228,7 @@ public class ParDoTest implements Serializable {
state.put(value.getKey(), value.getValue());
count.add(1);
if (count.read() >= 4) {
- Iterable<Map.Entry<String, Integer>> iterate = state.iterate();
+ Iterable<Map.Entry<String, Integer>> iterate = state.entries().read();
for (Map.Entry<String, Integer> entry : iterate) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
@@ -2274,7 +2274,7 @@ public class ParDoTest implements Serializable {
state.put(value.getKey(), new MyInteger(value.getValue()));
count.add(1);
if (count.read() >= 4) {
- Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate();
+ Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
for (Map.Entry<String, MyInteger> entry : iterate) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
@@ -2320,7 +2320,7 @@ public class ParDoTest implements Serializable {
state.put(value.getKey(), new MyInteger(value.getValue()));
count.add(1);
if (count.read() >= 4) {
- Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate();
+ Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
for (Map.Entry<String, MyInteger> entry : iterate) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
[2/2] beam git commit: This closes #2353: Revise MapState and
SetState APIs to leverage ReadableState
Posted by ke...@apache.org.
This closes #2353: Revise MapState and SetState APIs to leverage ReadableState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8edbb81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8edbb81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8edbb81
Branch: refs/heads/master
Commit: a8edbb81f340578966400d106297c1732ca3b7fd
Parents: b92032f 604be67
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 6 15:37:47 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:37:47 2017 -0700
----------------------------------------------------------------------
.../runners/core/InMemoryStateInternals.java | 84 ++++---------------
.../core/InMemoryStateInternalsTest.java | 87 +++++++++-----------
.../CopyOnAccessInMemoryStateInternalsTest.java | 16 ++--
.../apache/beam/sdk/util/state/MapState.java | 52 +++++-------
.../beam/sdk/util/state/ReadableStates.java | 45 ++++++++++
.../apache/beam/sdk/util/state/SetState.java | 34 +-------
.../apache/beam/sdk/transforms/ParDoTest.java | 6 +-
7 files changed, 134 insertions(+), 190 deletions(-)
----------------------------------------------------------------------