You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/07 01:13:19 UTC

[1/2] beam git commit: Revise MapState and SetState APIs to leverage ReadableState

Repository: beam
Updated Branches:
  refs/heads/master b92032ff6 -> a8edbb81f


Revise MapState and SetState APIs to leverage ReadableState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/604be670
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/604be670
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/604be670

Branch: refs/heads/master
Commit: 604be6703daadfcf085d69ee2859577218d6b3d4
Parents: b92032f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Mar 28 20:08:45 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:36:09 2017 -0700

----------------------------------------------------------------------
 .../runners/core/InMemoryStateInternals.java    | 84 ++++---------------
 .../core/InMemoryStateInternalsTest.java        | 87 +++++++++-----------
 .../CopyOnAccessInMemoryStateInternalsTest.java | 16 ++--
 .../apache/beam/sdk/util/state/MapState.java    | 52 +++++-------
 .../beam/sdk/util/state/ReadableStates.java     | 45 ++++++++++
 .../apache/beam/sdk/util/state/SetState.java    | 34 +-------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +-
 7 files changed, 134 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 0d5b058..55b7fc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -42,6 +40,7 @@ import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ReadableStates;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
@@ -468,13 +467,15 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public boolean contains(T t) {
-      return contents.contains(t);
+    public ReadableState<Boolean> contains(T t) {
+      return ReadableStates.immediate(contents.contains(t));
     }
 
     @Override
-    public boolean addIfAbsent(T t) {
-      return contents.add(t);
+    public ReadableState<Boolean> addIfAbsent(T t) {
+      boolean alreadyContained = contents.contains(t);
+      contents.add(t);
+      return ReadableStates.immediate(!alreadyContained);
     }
 
     @Override
@@ -483,33 +484,6 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public SetState<T> readLater(Iterable<T> elements) {
-      return this;
-    }
-
-    @Override
-    public boolean containsAny(Iterable<T> elements) {
-      elements = checkNotNull(elements);
-      for (T t : elements) {
-        if (contents.contains(t)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean containsAll(Iterable<T> elements) {
-      elements = checkNotNull(elements);
-      for (T t : elements) {
-        if (!contents.contains(t)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    @Override
     public InMemorySet<T> readLater() {
       return this;
     }
@@ -565,8 +539,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public V get(K key) {
-      return contents.get(key);
+    public ReadableState<V> get(K key) {
+      return ReadableStates.immediate(contents.get(key));
     }
 
     @Override
@@ -575,13 +549,13 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public V putIfAbsent(K key, V value) {
+    public ReadableState<V> putIfAbsent(K key, V value) {
       V v = contents.get(key);
       if (v == null) {
         v = contents.put(key, value);
       }
 
-      return v;
+      return ReadableStates.immediate(v);
     }
 
     @Override
@@ -590,42 +564,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public Iterable<V> get(Iterable<K> keys) {
-      List<V> values = new ArrayList<>();
-      for (K k : keys) {
-        values.add(contents.get(k));
-      }
-      return values;
+    public ReadableState<Iterable<K>> keys() {
+      return ReadableStates.immediate((Iterable<K>) contents.keySet());
     }
 
     @Override
-    public MapState<K, V> getLater(K k) {
-      return this;
-    }
-
-    @Override
-    public MapState<K, V> getLater(Iterable<K> keys) {
-      return this;
-    }
-
-    @Override
-    public Iterable<K> keys() {
-      return contents.keySet();
-    }
-
-    @Override
-    public Iterable<V> values() {
-      return contents.values();
-    }
-
-    @Override
-    public MapState<K, V> iterateLater() {
-      return this;
+    public ReadableState<Iterable<V>> values() {
+      return ReadableStates.immediate((Iterable<V>) contents.values());
     }
 
     @Override
-    public Iterable<Map.Entry<K, V>> iterate() {
-      return contents.entrySet();
+    public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+      return ReadableStates.immediate((Iterable<Map.Entry<K, V>>) contents.entrySet());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index e4fb5c1..34ddae6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -26,7 +28,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -112,10 +113,10 @@ public class InMemoryStateInternalsTest {
 
     assertThat(value.read(), Matchers.emptyIterable());
     value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+    assertThat(value.read(), containsInAnyOrder("hello"));
 
     value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+    assertThat(value.read(), containsInAnyOrder("hello", "world"));
 
     value.clear();
     assertThat(value.read(), Matchers.emptyIterable());
@@ -147,7 +148,7 @@ public class InMemoryStateInternalsTest {
     StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
 
     // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
     assertThat(bag2.read(), Matchers.emptyIterable());
   }
 
@@ -164,7 +165,7 @@ public class InMemoryStateInternalsTest {
     StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
 
     // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
     assertThat(bag1.read(), Matchers.emptyIterable());
     assertThat(bag2.read(), Matchers.emptyIterable());
   }
@@ -179,41 +180,32 @@ public class InMemoryStateInternalsTest {
 
     // empty
     assertThat(value.read(), Matchers.emptyIterable());
-    assertFalse(value.contains("A"));
-    assertFalse(value.containsAny(Collections.singletonList("A")));
+    assertFalse(value.contains("A").read());
 
     // add
     value.add("A");
     value.add("B");
     value.add("A");
-    assertFalse(value.addIfAbsent("B"));
-    assertThat(value.read(), Matchers.containsInAnyOrder("A", "B"));
+    assertFalse(value.addIfAbsent("B").read());
+    assertThat(value.read(), containsInAnyOrder("A", "B"));
 
     // remove
     value.remove("A");
-    assertThat(value.read(), Matchers.containsInAnyOrder("B"));
+    assertThat(value.read(), containsInAnyOrder("B"));
     value.remove("C");
-    assertThat(value.read(), Matchers.containsInAnyOrder("B"));
+    assertThat(value.read(), containsInAnyOrder("B"));
 
     // contains
-    assertFalse(value.contains("A"));
-    assertTrue(value.contains("B"));
+    assertFalse(value.contains("A").read());
+    assertTrue(value.contains("B").read());
     value.add("C");
     value.add("D");
 
-    // containsAny
-    assertTrue(value.containsAny(Arrays.asList("A", "C")));
-    assertFalse(value.containsAny(Arrays.asList("A", "E")));
-
-    // containsAll
-    assertTrue(value.containsAll(Arrays.asList("B", "C")));
-    assertFalse(value.containsAll(Arrays.asList("A", "B")));
-
     // readLater
-    assertThat(value.readLater().read(), Matchers.containsInAnyOrder("B", "C", "D"));
-    SetState<String> later = value.readLater(Arrays.asList("A", "C", "D"));
-    assertTrue(later.containsAll(Arrays.asList("C", "D")));
-    assertFalse(later.contains("A"));
+    assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
+    SetState<String> later = value.readLater();
+    assertThat(later.read(), hasItems("C", "D"));
+    assertFalse(later.contains("A").read());
 
     // clear
     value.clear();
@@ -248,7 +240,7 @@ public class InMemoryStateInternalsTest {
     StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
 
     // Reading the merged set gets both the contents
-    assertThat(set1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
     assertThat(set2.read(), Matchers.emptyIterable());
   }
 
@@ -266,7 +258,7 @@ public class InMemoryStateInternalsTest {
     StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
 
     // Reading the merged set gets both the contents
-    assertThat(set3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
     assertThat(set1.read(), Matchers.emptyIterable());
     assertThat(set2.read(), Matchers.emptyIterable());
   }
@@ -330,49 +322,46 @@ public class InMemoryStateInternalsTest {
     assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
 
     // put
-    assertThat(value.iterate(), Matchers.emptyIterable());
+    assertThat(value.entries().read(), Matchers.emptyIterable());
     value.put("A", 1);
     value.put("B", 2);
     value.put("A", 11);
-    assertThat(value.putIfAbsent("B", 22), equalTo(2));
-    assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("A", 11),
+    assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
         MapEntry.of("B", 2)));
 
     // remove
     value.remove("A");
-    assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2)));
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
     value.remove("C");
-    assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2)));
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
 
     // get
-    assertNull(value.get("A"));
-    assertThat(value.get("B"), equalTo(2));
+    assertNull(value.get("A").read());
+    assertThat(value.get("B").read(), equalTo(2));
     value.put("C", 3);
     value.put("D", 4);
-    assertThat(value.get("C"), equalTo(3));
-    assertThat(value.get(Collections.singletonList("D")), Matchers.containsInAnyOrder(4));
-    assertThat(value.get(Arrays.asList("B", "C")), Matchers.containsInAnyOrder(2, 3));
+    assertThat(value.get("C").read(), equalTo(3));
 
     // iterate
     value.put("E", 5);
     value.remove("C");
-    assertThat(value.keys(), Matchers.containsInAnyOrder("B", "D", "E"));
-    assertThat(value.values(), Matchers.containsInAnyOrder(2, 4, 5));
-    assertThat(value.iterate(), Matchers.containsInAnyOrder(
-        MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+    assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
+    assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
+    assertThat(
+        value.entries().read(),
+        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
 
     // readLater
-    assertThat(value.getLater("B").get("B"), equalTo(2));
-    assertNull(value.getLater("A").get("A"));
-    MapState<String, Integer> later = value.getLater(Arrays.asList("C", "D"));
-    assertNull(later.get("C"));
-    assertThat(later.get("D"), equalTo(4));
-    assertThat(value.iterateLater().iterate(), Matchers.containsInAnyOrder(
-        MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+    assertThat(value.get("B").readLater().read(), equalTo(2));
+    assertNull(value.get("A").readLater().read());
+    assertThat(
+        value.entries().readLater().read(),
+        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
 
     // clear
     value.clear();
-    assertThat(value.iterate(), Matchers.emptyIterable());
+    assertThat(value.entries().read(), Matchers.emptyIterable());
     assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 142af32..68c6613 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -201,24 +201,24 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     StateTag<Object, MapState<String, Integer>> valueTag =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
     MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.iterate(), emptyIterable());
+    assertThat(underlyingValue.entries().read(), emptyIterable());
 
     underlyingValue.put("hello", 1);
-    assertThat(underlyingValue.get("hello"), equalTo(1));
+    assertThat(underlyingValue.get("hello").read(), equalTo(1));
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
-    assertThat(copyOnAccessState.get("hello"), equalTo(1));
+    assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
 
     copyOnAccessState.put("world", 4);
-    assertThat(copyOnAccessState.get("hello"), equalTo(1));
-    assertThat(copyOnAccessState.get("world"), equalTo(4));
-    assertThat(underlyingValue.get("hello"), equalTo(1));
-    assertNull(underlyingValue.get("world"));
+    assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
+    assertThat(copyOnAccessState.get("world").read(), equalTo(4));
+    assertThat(underlyingValue.get("hello").read(), equalTo(1));
+    assertNull(underlyingValue.get("world").read());
 
     MapState<String, Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.iterate(), equalTo(reReadUnderlyingValue.iterate()));
+    assertThat(underlyingValue.entries().read(), equalTo(reReadUnderlyingValue.entries().read()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
index 85d99d6..fb7e807 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
@@ -30,21 +30,20 @@ import java.util.Map;
 public interface MapState<K, V> extends State {
 
   /**
-   * Returns the value to which the specified key is mapped in the state.
-   */
-  V get(K key);
-
-  /**
    * Associates the specified value with the specified key in this state.
    */
   void put(K key, V value);
 
   /**
-   * If the specified key is not already associated with a value (or is mapped
-   * to {@code null}) associates it with the given value and returns
-   * {@code null}, else returns the current value.
+   * A deferred read-followed-by-write.
+   *
+   * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
+   * map and reconciliation with any pending modifications.
+   *
+   * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
+   * associates it with the given value and returns {@code null}, else returns the current value.
    */
-  V putIfAbsent(K key, V value);
+  ReadableState<V> putIfAbsent(K key, V value);
 
   /**
    * Removes the mapping for a key from this map if it is present.
@@ -52,42 +51,29 @@ public interface MapState<K, V> extends State {
   void remove(K key);
 
   /**
-   * A bulk get.
-   * @param keys the keys to search for
-   * @return a iterable view of values, maybe some values is null.
-   * The order of values corresponds to the order of the keys.
+   * A deferred lookup.
+   *
+   * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
+   * on the results.
+   *
+   * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
+   * all pending reads in a single batch.
    */
-  Iterable<V> get(Iterable<K> keys);
-
-  /**
-   * Indicate that specified key will be read later.
-   */
-  MapState<K, V> getLater(K k);
-
-  /**
-   * Indicate that specified batch keys will be read later.
-   */
-  MapState<K, V> getLater(Iterable<K> keys);
+  ReadableState<V> get(K key);
 
   /**
    * Returns a iterable view of the keys contained in this map.
    */
-  Iterable<K> keys();
+  ReadableState<Iterable<K>> keys();
 
   /**
    * Returns a iterable view of the values contained in this map.
    */
-  Iterable<V> values();
-
-  /**
-   * Indicate that all key-values will be read later.
-   */
-  MapState<K, V> iterateLater();
+  ReadableState<Iterable<V>> values();
 
   /**
    * Returns a iterable view of all key-values.
    */
-  Iterable<Map.Entry<K, V>> iterate();
-
+  ReadableState<Iterable<Map.Entry<K, V>>> entries();
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
new file mode 100644
index 0000000..819eda6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Utilities for constructing and manipulating {@link ReadableState} instances.
+ */
+@Experimental(Kind.STATE)
+public class ReadableStates {
+
+  /**
+   * A {@link ReadableState} constructed from a constant value, hence immediately available.
+   */
+  public static <T> ReadableState<T> immediate(final T value) {
+    return new ReadableState<T>() {
+      @Override
+      public T read() {
+        return value;
+      }
+
+      @Override
+      public ReadableState<T> readLater() {
+        return this;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 5c907d5..56ea510 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -27,45 +27,19 @@ public interface SetState<T> extends GroupingState<T, Iterable<T>> {
   /**
    * Returns true if this set contains the specified element.
    */
-  boolean contains(T t);
+  ReadableState<Boolean> contains(T t);
 
   /**
-   * Add a value to the buffer if it is not already present.
-   * If this set already contains the element, the call leaves the set
-   * unchanged and returns false.
+   * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
+   * false} otherwise.
    */
-  boolean addIfAbsent(T t);
+  ReadableState<Boolean> addIfAbsent(T t);
 
   /**
    * Removes the specified element from this set if it is present.
    */
   void remove(T t);
 
-  /**
-   * Indicate that elements will be read later.
-   * @param elements to be read later
-   * @return this for convenient chaining
-   */
-  SetState<T> readLater(Iterable<T> elements);
-
-  /**
-   * <p>Checks if SetState contains any given elements.</p>
-   *
-   * @param elements the elements to search for
-   * @return the {@code true} if any of the elements are found,
-   * {@code false} if no match
-   */
-  boolean containsAny(Iterable<T> elements);
-
-  /**
-   * <p>Checks if SetState contains all given elements.</p>
-   *
-   * @param elements the elements to find
-   * @return true if the SetState contains all elements,
-   *  false if not
-   */
-  boolean containsAll(Iterable<T> elements);
-
   @Override
   SetState<T> readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e305da1..b429eab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2228,7 +2228,7 @@ public class ParDoTest implements Serializable {
             state.put(value.getKey(), value.getValue());
             count.add(1);
             if (count.read() >= 4) {
-              Iterable<Map.Entry<String, Integer>> iterate = state.iterate();
+              Iterable<Map.Entry<String, Integer>> iterate = state.entries().read();
               for (Map.Entry<String, Integer> entry : iterate) {
                 c.output(KV.of(entry.getKey(), entry.getValue()));
               }
@@ -2274,7 +2274,7 @@ public class ParDoTest implements Serializable {
             state.put(value.getKey(), new MyInteger(value.getValue()));
             count.add(1);
             if (count.read() >= 4) {
-              Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate();
+              Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
               for (Map.Entry<String, MyInteger> entry : iterate) {
                 c.output(KV.of(entry.getKey(), entry.getValue()));
               }
@@ -2320,7 +2320,7 @@ public class ParDoTest implements Serializable {
             state.put(value.getKey(), new MyInteger(value.getValue()));
             count.add(1);
             if (count.read() >= 4) {
-              Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate();
+              Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
               for (Map.Entry<String, MyInteger> entry : iterate) {
                 c.output(KV.of(entry.getKey(), entry.getValue()));
               }


[2/2] beam git commit: This closes #2353: Revise MapState and SetState APIs to leverage ReadableState

Posted by ke...@apache.org.
This closes #2353: Revise MapState and SetState APIs to leverage ReadableState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8edbb81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8edbb81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8edbb81

Branch: refs/heads/master
Commit: a8edbb81f340578966400d106297c1732ca3b7fd
Parents: b92032f 604be67
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 6 15:37:47 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:37:47 2017 -0700

----------------------------------------------------------------------
 .../runners/core/InMemoryStateInternals.java    | 84 ++++---------------
 .../core/InMemoryStateInternalsTest.java        | 87 +++++++++-----------
 .../CopyOnAccessInMemoryStateInternalsTest.java | 16 ++--
 .../apache/beam/sdk/util/state/MapState.java    | 52 +++++-------
 .../beam/sdk/util/state/ReadableStates.java     | 45 ++++++++++
 .../apache/beam/sdk/util/state/SetState.java    | 34 +-------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +-
 7 files changed, 134 insertions(+), 190 deletions(-)
----------------------------------------------------------------------