You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/13 09:45:03 UTC
[1/4] beam git commit: Add set and map readable test to
StateInternalsTest
Repository: beam
Updated Branches:
refs/heads/master fe3d55403 -> 7126fdc6e
Add set and map readable test to StateInternalsTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d186063
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d186063
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d186063
Branch: refs/heads/master
Commit: 4d18606378f43c7b0d3ac05d45ca6e0570e49eef
Parents: 10b166b
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue Jun 13 10:15:33 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200
----------------------------------------------------------------------
.../beam/runners/core/StateInternalsTest.java | 40 ++++++++++++++++++++
1 file changed, 40 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4d186063/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
index bf3156a..6011fb4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
@@ -570,4 +571,43 @@ public abstract class StateInternalsTest {
assertThat(value1.read(), equalTo(null));
assertThat(value2.read(), equalTo(null));
}
+
+ @Test
+ public void testSetReadable() throws Exception {
+ SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+ // test contains
+ ReadableState<Boolean> readable = value.contains("A");
+ value.add("A");
+ assertFalse(readable.read());
+
+ // test addIfAbsent
+ value.addIfAbsent("B");
+ assertTrue(value.contains("B").read());
+ }
+
+ @Test
+ public void testMapReadable() throws Exception {
+ MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
+
+ // test iterable, should just return a iterable view of the values contained in this map.
+ // The iterable is backed by the map, so changes to the map are reflected in the iterable.
+ ReadableState<Iterable<String>> keys = value.keys();
+ ReadableState<Iterable<Integer>> values = value.values();
+ ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries();
+ value.put("A", 1);
+ assertFalse(Iterables.isEmpty(keys.read()));
+ assertFalse(Iterables.isEmpty(values.read()));
+ assertFalse(Iterables.isEmpty(entries.read()));
+
+ // test get
+ ReadableState<Integer> get = value.get("B");
+ value.put("B", 2);
+ assertNull(get.read());
+
+ // test addIfAbsent
+ value.putIfAbsent("C", 3);
+ assertThat(value.get("C").read(), equalTo(3));
+ }
+
}
[3/4] beam git commit: Use CoderTypeSerializer and remove unuse code
in FlinkStateInternals
Posted by al...@apache.org.
Use CoderTypeSerializer and remove unuse code in FlinkStateInternals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c365087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c365087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c365087
Branch: refs/heads/master
Commit: 4c36508733a69fafce0f7dfb86c71eee5eb6bc84
Parents: fe3d554
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jun 7 14:34:25 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200
----------------------------------------------------------------------
.../streaming/state/FlinkStateInternals.java | 198 +------------------
1 file changed, 10 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4c365087/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index f0d3278..d8771de 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -25,7 +25,6 @@ import java.util.Map;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -196,9 +195,8 @@ public class FlinkStateInternals<K> implements StateInternals {
this.address = address;
this.flinkStateBackend = flinkStateBackend;
- CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ flinkStateDescriptor = new ValueStateDescriptor<>(
+ address.getId(), new CoderTypeSerializer<>(coder));
}
@Override
@@ -282,9 +280,8 @@ public class FlinkStateInternals<K> implements StateInternals {
this.address = address;
this.flinkStateBackend = flinkStateBackend;
- CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
- flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo);
+ flinkStateDescriptor = new ListStateDescriptor<>(
+ address.getId(), new CoderTypeSerializer<>(coder));
}
@Override
@@ -398,9 +395,8 @@ public class FlinkStateInternals<K> implements StateInternals {
this.combineFn = combineFn;
this.flinkStateBackend = flinkStateBackend;
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ flinkStateDescriptor = new ValueStateDescriptor<>(
+ address.getId(), new CoderTypeSerializer<>(accumCoder));
}
@Override
@@ -545,179 +541,6 @@ public class FlinkStateInternals<K> implements StateInternals {
}
}
- private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
- private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
- private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
-
- FlinkKeyedCombiningState(
- KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkStateInternals<K> flinkStateInternals) {
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
-
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- current = combineFn.createAccumulator();
- }
- current = combineFn.addInput(current, value);
- state.update(current);
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT current = state.value();
- if (current == null) {
- state.update(accum);
- } else {
- current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
- state.update(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT read() {
- try {
- org.apache.flink.api.common.state.ValueState<AccumT> state =
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor);
-
- AccumT accum = state.value();
- if (accum != null) {
- return combineFn.extractOutput(accum);
- } else {
- return combineFn.extractOutput(combineFn.createAccumulator());
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).value() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- try {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).clear();
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkKeyedCombiningState<?, ?, ?, ?> that =
- (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
implements CombiningState<InputT, AccumT, OutputT> {
@@ -745,9 +568,8 @@ public class FlinkStateInternals<K> implements StateInternals {
this.flinkStateInternals = flinkStateInternals;
this.context = context;
- CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ flinkStateDescriptor = new ValueStateDescriptor<>(
+ address.getId(), new CoderTypeSerializer<>(accumCoder));
}
@Override
@@ -913,8 +735,8 @@ public class FlinkStateInternals<K> implements StateInternals {
this.flinkStateBackend = flinkStateBackend;
this.flinkStateInternals = flinkStateInternals;
- CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of());
- flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
+ flinkStateDescriptor = new ValueStateDescriptor<>(
+ address.getId(), new CoderTypeSerializer<>(InstantCoder.of()));
}
@Override
[4/4] beam git commit: This closes #3309
Posted by al...@apache.org.
This closes #3309
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7126fdc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7126fdc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7126fdc6
Branch: refs/heads/master
Commit: 7126fdc6ee5671e99a2dede3f25ba616aa0e8fa4
Parents: fe3d554 4d18606
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jun 13 11:35:43 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:43 2017 +0200
----------------------------------------------------------------------
.../beam/runners/core/StateInternalsTest.java | 40 ++
runners/flink/pom.xml | 1 -
.../streaming/state/FlinkStateInternals.java | 425 +++++++++----------
.../streaming/FlinkStateInternalsTest.java | 17 -
4 files changed, 232 insertions(+), 251 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: [BEAM-1483] Support SetState in Flink runner
and fix MapState to be consistent with InMemoryStateInternals.
Posted by al...@apache.org.
[BEAM-1483] Support SetState in Flink runner and fix MapState to be consistent with InMemoryStateInternals.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b166b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b166b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b166b3
Branch: refs/heads/master
Commit: 10b166b355a03daeae78dd1e71016fc72805939d
Parents: 4c36508
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jun 7 14:40:30 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 13 11:35:17 2017 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 1 -
.../streaming/state/FlinkStateInternals.java | 227 +++++++++++++++----
.../streaming/FlinkStateInternalsTest.java | 17 --
3 files changed, 182 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a5b8203..339aa8e 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -91,7 +91,6 @@
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.LargeKeys$Above100MB,
- org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesSplittableParDo
http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index d8771de..a0b015b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
@@ -48,6 +50,7 @@ import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.joda.time.Instant;
@@ -127,8 +130,8 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public <T> SetState<T> bindSet(
StateTag<SetState<T>> address, Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
+ return new FlinkSetState<>(
+ flinkStateBackend, address, namespace, elemCoder);
}
@Override
@@ -875,24 +878,15 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public ReadableState<ValueT> get(final KeyT input) {
- return new ReadableState<ValueT>() {
- @Override
- public ValueT read() {
- try {
- return flinkStateBackend.getPartitionedState(
+ try {
+ return ReadableStates.immediate(
+ flinkStateBackend.getPartitionedState(
namespace.stringKey(),
StringSerializer.INSTANCE,
- flinkStateDescriptor).get(input);
- } catch (Exception e) {
- throw new RuntimeException("Error get from state.", e);
- }
- }
-
- @Override
- public ReadableState<ValueT> readLater() {
- return this;
- }
- };
+ flinkStateDescriptor).get(input));
+ } catch (Exception e) {
+ throw new RuntimeException("Error get from state.", e);
+ }
}
@Override
@@ -909,32 +903,22 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public ReadableState<ValueT> putIfAbsent(final KeyT key, final ValueT value) {
- return new ReadableState<ValueT>() {
- @Override
- public ValueT read() {
- try {
- ValueT current = flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).get(key);
-
- if (current == null) {
- flinkStateBackend.getPartitionedState(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- flinkStateDescriptor).put(key, value);
- }
- return current;
- } catch (Exception e) {
- throw new RuntimeException("Error put kv to state.", e);
- }
- }
+ try {
+ ValueT current = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get(key);
- @Override
- public ReadableState<ValueT> readLater() {
- return this;
+ if (current == null) {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).put(key, value);
}
- };
+ return ReadableStates.immediate(current);
+ } catch (Exception e) {
+ throw new RuntimeException("Error put kv to state.", e);
+ }
}
@Override
@@ -955,10 +939,11 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public Iterable<KeyT> read() {
try {
- return flinkStateBackend.getPartitionedState(
+ Iterable<KeyT> result = flinkStateBackend.getPartitionedState(
namespace.stringKey(),
StringSerializer.INSTANCE,
flinkStateDescriptor).keys();
+ return result != null ? result : Collections.<KeyT>emptyList();
} catch (Exception e) {
throw new RuntimeException("Error get map state keys.", e);
}
@@ -977,10 +962,11 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public Iterable<ValueT> read() {
try {
- return flinkStateBackend.getPartitionedState(
+ Iterable<ValueT> result = flinkStateBackend.getPartitionedState(
namespace.stringKey(),
StringSerializer.INSTANCE,
flinkStateDescriptor).values();
+ return result != null ? result : Collections.<ValueT>emptyList();
} catch (Exception e) {
throw new RuntimeException("Error get map state values.", e);
}
@@ -999,10 +985,11 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public Iterable<Map.Entry<KeyT, ValueT>> read() {
try {
- return flinkStateBackend.getPartitionedState(
+ Iterable<Map.Entry<KeyT, ValueT>> result = flinkStateBackend.getPartitionedState(
namespace.stringKey(),
StringSerializer.INSTANCE,
flinkStateDescriptor).entries();
+ return result != null ? result : Collections.<Map.Entry<KeyT, ValueT>>emptyList();
} catch (Exception e) {
throw new RuntimeException("Error get map state entries.", e);
}
@@ -1050,4 +1037,154 @@ public class FlinkStateInternals<K> implements StateInternals {
}
}
+ private static class FlinkSetState<T> implements SetState<T> {
+
+ private final StateNamespace namespace;
+ private final StateTag<SetState<T>> address;
+ private final MapStateDescriptor<T, Boolean> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+ FlinkSetState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<SetState<T>> address,
+ StateNamespace namespace,
+ Coder<T> coder) {
+ this.namespace = namespace;
+ this.address = address;
+ this.flinkStateBackend = flinkStateBackend;
+ this.flinkStateDescriptor = new MapStateDescriptor<>(address.getId(),
+ new CoderTypeSerializer<>(coder), new BooleanSerializer());
+ }
+
+ @Override
+ public ReadableState<Boolean> contains(final T t) {
+ try {
+ Boolean result = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get(t);
+ return ReadableStates.immediate(result != null ? result : false);
+ } catch (Exception e) {
+ throw new RuntimeException("Error contains value from state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> addIfAbsent(final T t) {
+ try {
+ org.apache.flink.api.common.state.MapState<T, Boolean> state =
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor);
+ boolean alreadyContained = state.contains(t);
+ if (!alreadyContained) {
+ state.put(t, true);
+ }
+ return ReadableStates.immediate(!alreadyContained);
+ } catch (Exception e) {
+ throw new RuntimeException("Error addIfAbsent value to state.", e);
+ }
+ }
+
+ @Override
+ public void remove(T t) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).remove(t);
+ } catch (Exception e) {
+ throw new RuntimeException("Error remove value to state.", e);
+ }
+ }
+
+ @Override
+ public SetState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public void add(T value) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).put(value, true);
+ } catch (Exception e) {
+ throw new RuntimeException("Error add value to state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).keys();
+ return result == null || Iterables.isEmpty(result);
+ } catch (Exception e) {
+ throw new RuntimeException("Error isEmpty from state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public Iterable<T> read() {
+ try {
+ Iterable<T> result = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).keys();
+ return result != null ? result : Collections.<T>emptyList();
+ } catch (Exception e) {
+ throw new RuntimeException("Error read from state.", e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkSetState<?> that = (FlinkSetState<?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10b166b3/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index e7564ec..b8d41de 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -63,21 +63,4 @@ public class FlinkStateInternalsTest extends StateInternalsTest {
}
}
- ///////////////////////// Unsupported tests \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
-
- @Override
- public void testSet() {}
-
- @Override
- public void testSetIsEmpty() {}
-
- @Override
- public void testMergeSetIntoSource() {}
-
- @Override
- public void testMergeSetIntoNewNamespace() {}
-
- @Override
- public void testMap() {}
-
}