You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/13 09:45:03 UTC

[1/4] beam git commit: Add set and map readable test to StateInternalsTest

Repository: beam
Updated Branches:
  refs/heads/master fe3d55403 -> 7126fdc6e


Add set and map readable test to StateInternalsTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d186063
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d186063
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d186063

Branch: refs/heads/master
Commit: 4d18606378f43c7b0d3ac05d45ca6e0570e49eef
Parents: 10b166b
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue Jun 13 10:15:33 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/core/StateInternalsTest.java   | 40 ++++++++++++++++++++
 1 file changed, 40 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4d186063/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
index bf3156a..6011fb4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
@@ -570,4 +571,43 @@ public abstract class StateInternalsTest {
     assertThat(value1.read(), equalTo(null));
     assertThat(value2.read(), equalTo(null));
   }
+
+  @Test
+  public void testSetReadable() throws Exception {
+    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+    // test contains
+    ReadableState<Boolean> readable = value.contains("A");
+    value.add("A");
+    assertFalse(readable.read());
+
+    // test addIfAbsent
+    value.addIfAbsent("B");
+    assertTrue(value.contains("B").read());
+  }
+
+  @Test
+  public void testMapReadable() throws Exception {
+    MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
+
+    // test iterable, should just return a iterable view of the values contained in this map.
+    // The iterable is backed by the map, so changes to the map are reflected in the iterable.
+    ReadableState<Iterable<String>> keys = value.keys();
+    ReadableState<Iterable<Integer>> values = value.values();
+    ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries();
+    value.put("A", 1);
+    assertFalse(Iterables.isEmpty(keys.read()));
+    assertFalse(Iterables.isEmpty(values.read()));
+    assertFalse(Iterables.isEmpty(entries.read()));
+
+    // test get
+    ReadableState<Integer> get = value.get("B");
+    value.put("B", 2);
+    assertNull(get.read());
+
+    // test addIfAbsent
+    value.putIfAbsent("C", 3);
+    assertThat(value.get("C").read(), equalTo(3));
+  }
+
 }


[3/4] beam git commit: Use CoderTypeSerializer and remove unuse code in FlinkStateInternals

Posted by al...@apache.org.
Use CoderTypeSerializer and remove unuse code in FlinkStateInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c365087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c365087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c365087

Branch: refs/heads/master
Commit: 4c36508733a69fafce0f7dfb86c71eee5eb6bc84
Parents: fe3d554
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jun 7 14:34:25 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/FlinkStateInternals.java    | 198 +------------------
 1 file changed, 10 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4c365087/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index f0d3278..d8771de 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -196,9 +195,8 @@ public class FlinkStateInternals<K> implements StateInternals {
       this.address = address;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -282,9 +280,8 @@ public class FlinkStateInternals<K> implements StateInternals {
       this.address = address;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo);
+      flinkStateDescriptor = new ListStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -398,9 +395,8 @@ public class FlinkStateInternals<K> implements StateInternals {
       this.combineFn = combineFn;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -545,179 +541,6 @@ public class FlinkStateInternals<K> implements StateInternals {
     }
   }
 
-  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-
-    FlinkKeyedCombiningState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator();
-        }
-        current = combineFn.addInput(current, value);
-        state.update(current);
-      } catch (RuntimeException re) {
-        throw re;
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(accum);
-        } else {
-          return combineFn.extractOutput(combineFn.createAccumulator());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
   private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
@@ -745,9 +568,8 @@ public class FlinkStateInternals<K> implements StateInternals {
       this.flinkStateInternals = flinkStateInternals;
       this.context = context;
 
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -913,8 +735,8 @@ public class FlinkStateInternals<K> implements StateInternals {
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateInternals = flinkStateInternals;
 
-      CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of());
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(InstantCoder.of()));
     }
 
     @Override


[4/4] beam git commit: This closes #3309

Posted by al...@apache.org.
This closes #3309


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7126fdc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7126fdc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7126fdc6

Branch: refs/heads/master
Commit: 7126fdc6ee5671e99a2dede3f25ba616aa0e8fa4
Parents: fe3d554 4d18606
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jun 13 11:35:43 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:43 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/core/StateInternalsTest.java   |  40 ++
 runners/flink/pom.xml                           |   1 -
 .../streaming/state/FlinkStateInternals.java    | 425 +++++++++----------
 .../streaming/FlinkStateInternalsTest.java      |  17 -
 4 files changed, 232 insertions(+), 251 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: [BEAM-1483] Support SetState in Flink runner and fix MapState to be consistent with InMemoryStateInternals.

Posted by al...@apache.org.
[BEAM-1483] Support SetState in Flink runner and fix MapState to be consistent with InMemoryStateInternals.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b166b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b166b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b166b3

Branch: refs/heads/master
Commit: 10b166b355a03daeae78dd1e71016fc72805939d
Parents: 4c36508
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jun 7 14:40:30 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |   1 -
 .../streaming/state/FlinkStateInternals.java    | 227 +++++++++++++++----
 .../streaming/FlinkStateInternalsTest.java      |  17 --
 3 files changed, 182 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a5b8203..339aa8e 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -91,7 +91,6 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.LargeKeys$Above100MB,
-                    org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream,
                     org.apache.beam.sdk.testing.UsesSplittableParDo

http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index d8771de..a0b015b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
 import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.ReadableStates;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateContext;
@@ -48,6 +50,7 @@ import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.joda.time.Instant;
@@ -127,8 +130,8 @@ public class FlinkStateInternals<K> implements StateInternals {
           @Override
           public <T> SetState<T> bindSet(
               StateTag<SetState<T>> address, Coder<T> elemCoder) {
-            throw new UnsupportedOperationException(
-                String.format("%s is not supported", SetState.class.getSimpleName()));
+            return new FlinkSetState<>(
+                flinkStateBackend, address, namespace, elemCoder);
           }
 
           @Override
@@ -875,24 +878,15 @@ public class FlinkStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<ValueT> get(final KeyT input) {
-      return new ReadableState<ValueT>() {
-        @Override
-        public ValueT read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
+      try {
+        return ReadableStates.immediate(
+            flinkStateBackend.getPartitionedState(
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
-                flinkStateDescriptor).get(input);
-          } catch (Exception e) {
-            throw new RuntimeException("Error get from state.", e);
-          }
-        }
-
-        @Override
-        public ReadableState<ValueT> readLater() {
-          return this;
-        }
-      };
+                flinkStateDescriptor).get(input));
+      } catch (Exception e) {
+        throw new RuntimeException("Error get from state.", e);
+      }
     }
 
     @Override
@@ -909,32 +903,22 @@ public class FlinkStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<ValueT> putIfAbsent(final KeyT key, final ValueT value) {
-      return new ReadableState<ValueT>() {
-        @Override
-        public ValueT read() {
-          try {
-            ValueT current = flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).get(key);
-
-            if (current == null) {
-              flinkStateBackend.getPartitionedState(
-                  namespace.stringKey(),
-                  StringSerializer.INSTANCE,
-                  flinkStateDescriptor).put(key, value);
-            }
-            return current;
-          } catch (Exception e) {
-            throw new RuntimeException("Error put kv to state.", e);
-          }
-        }
+      try {
+        ValueT current = flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).get(key);
 
-        @Override
-        public ReadableState<ValueT> readLater() {
-          return this;
+        if (current == null) {
+          flinkStateBackend.getPartitionedState(
+              namespace.stringKey(),
+              StringSerializer.INSTANCE,
+              flinkStateDescriptor).put(key, value);
         }
-      };
+        return ReadableStates.immediate(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error put kv to state.", e);
+      }
     }
 
     @Override
@@ -955,10 +939,11 @@ public class FlinkStateInternals<K> implements StateInternals {
         @Override
         public Iterable<KeyT> read() {
           try {
-            return flinkStateBackend.getPartitionedState(
+            Iterable<KeyT> result = flinkStateBackend.getPartitionedState(
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).keys();
+            return result != null ? result : Collections.<KeyT>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state keys.", e);
           }
@@ -977,10 +962,11 @@ public class FlinkStateInternals<K> implements StateInternals {
         @Override
         public Iterable<ValueT> read() {
           try {
-            return flinkStateBackend.getPartitionedState(
+            Iterable<ValueT> result = flinkStateBackend.getPartitionedState(
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).values();
+            return result != null ? result : Collections.<ValueT>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state values.", e);
           }
@@ -999,10 +985,11 @@ public class FlinkStateInternals<K> implements StateInternals {
         @Override
         public Iterable<Map.Entry<KeyT, ValueT>> read() {
           try {
-            return flinkStateBackend.getPartitionedState(
+            Iterable<Map.Entry<KeyT, ValueT>> result = flinkStateBackend.getPartitionedState(
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).entries();
+            return result != null ? result : Collections.<Map.Entry<KeyT, ValueT>>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state entries.", e);
           }
@@ -1050,4 +1037,154 @@ public class FlinkStateInternals<K> implements StateInternals {
     }
   }
 
+  private static class FlinkSetState<T> implements SetState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<SetState<T>> address;
+    private final MapStateDescriptor<T, Boolean> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+    FlinkSetState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<SetState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      this.namespace = namespace;
+      this.address = address;
+      this.flinkStateBackend = flinkStateBackend;
+      this.flinkStateDescriptor = new MapStateDescriptor<>(address.getId(),
+          new CoderTypeSerializer<>(coder), new BooleanSerializer());
+    }
+
+    @Override
+    public ReadableState<Boolean> contains(final T t) {
+      try {
+        Boolean result = flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).get(t);
+        return ReadableStates.immediate(result != null ? result : false);
+      } catch (Exception e) {
+        throw new RuntimeException("Error contains value from state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> addIfAbsent(final T t) {
+      try {
+        org.apache.flink.api.common.state.MapState<T, Boolean> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+        boolean alreadyContained = state.contains(t);
+        if (!alreadyContained) {
+          state.put(t, true);
+        }
+        return ReadableStates.immediate(!alreadyContained);
+      } catch (Exception e) {
+        throw new RuntimeException("Error addIfAbsent value to state.", e);
+      }
+    }
+
+    @Override
+    public void remove(T t) {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).remove(t);
+      } catch (Exception e) {
+        throw new RuntimeException("Error remove value to state.", e);
+      }
+    }
+
+    @Override
+    public SetState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(T value) {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).put(value, true);
+      } catch (Exception e) {
+        throw new RuntimeException("Error add value to state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            Iterable<T> result = flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).keys();
+            return result == null || Iterables.isEmpty(result);
+          } catch (Exception e) {
+            throw new RuntimeException("Error isEmpty from state.", e);
+          }
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public Iterable<T> read() {
+      try {
+        Iterable<T> result = flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).keys();
+        return result != null ? result : Collections.<T>emptyList();
+      } catch (Exception e) {
+        throw new RuntimeException("Error read from state.", e);
+      }
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkSetState<?> that = (FlinkSetState<?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index e7564ec..b8d41de 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -63,21 +63,4 @@ public class FlinkStateInternalsTest extends StateInternalsTest {
     }
   }
 
-  ///////////////////////// Unsupported tests \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
-
-  @Override
-  public void testSet() {}
-
-  @Override
-  public void testSetIsEmpty() {}
-
-  @Override
-  public void testMergeSetIntoSource() {}
-
-  @Override
-  public void testMergeSetIntoNewNamespace() {}
-
-  @Override
-  public void testMap() {}
-
 }