You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:02 UTC
[02/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
new file mode 100644
index 0000000..481b7fb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Default StepContext for running DoFn This does not allow accessing state or timer internals.
+ */
+public class DefaultStepContext implements ExecutionContext.StepContext {
+
+ private TimerInternals timerInternals;
+
+ private StateInternals stateInternals;
+
+ public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+ this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+ this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+ }
+
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
+ throw new UnsupportedOperationException("Writing side-input data is not supported.");
+ }
+
+ @Override
+ public StateInternals stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+
+ public void setStateInternals(StateInternals stateInternals) {
+ this.stateInternals = stateInternals;
+ }
+
+ public void setTimerInternals(TimerInternals timerInternals) {
+ this.timerInternals = timerInternals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
new file mode 100644
index 0000000..cbf815a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.util;
+
+import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+public class RunnerUtils {
+ /**
+ * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
+ * @param elem
+ * @return
+ */
+ public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+ WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+ SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+ kvElem.getValue().getKey(),
+ kvElem.withValue(kvElem.getValue().getValue()));
+ return workItem;
+ }
+
+ public static boolean isGroupByKeyExecutor (Executor executor) {
+ if (executor instanceof GroupByWindowExecutor) {
+ return true;
+ } else if (executor instanceof StatefulDoFnExecutor ||
+ executor instanceof MultiStatefulDoFnExecutor) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
new file mode 100644
index 0000000..391699b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.jstorm.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private final byte[] serializedOptions;
+
+ /** Lazily initialized copy of deserialized options */
+ private transient PipelineOptions pipelineOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+ return pipelineOptions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..dee5f1a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.util;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
+public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+ final K key;
+ final WindowedValue<ElemT> value;
+
+ private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
+ return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ public WindowedValue<ElemT> value() {
+ return value;
+ }
+
+ @Override
+ public Iterable<TimerInternals.TimerData> timersIterable() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Iterable<WindowedValue<ElemT>> elementsIterable() {
+ return Collections.singletonList(value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
deleted file mode 100644
index 0ecffff..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import avro.shaded.com.google.common.collect.Maps;
-import com.alibaba.jstorm.beam.translation.runtime.TimerServiceImpl;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
-import com.alibaba.jstorm.utils.KryoSerializer;
-
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link JStormStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class JStormStateInternalsTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- private JStormStateInternals<String> jstormStateInternals;
-
- @Before
- public void setup() throws Exception {
- IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
- Maps.newHashMap(),
- "test",
- tmp.toString(),
- new KryoSerializer(Maps.newHashMap()));
- jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0);
- }
-
- @Test
- public void testValueState() throws Exception {
- ValueState<Integer> valueState = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
- valueState.write(Integer.MIN_VALUE);
- assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
- valueState.write(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
- }
-
- @Test
- public void testValueStateIdenticalId() throws Exception {
- ValueState<Integer> valueState = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
- ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-
- valueState.write(Integer.MIN_VALUE);
- assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
- assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
- valueState.write(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
- assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
- }
-
- @Test
- public void testBagState() throws Exception {
- BagState<Integer> bagStateA = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
- BagState<Integer> bagStateB = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
-
- bagStateA.add(1);
- bagStateA.add(0);
- bagStateA.add(Integer.MAX_VALUE);
-
- bagStateB.add(0);
- bagStateB.add(Integer.MIN_VALUE);
-
- Iterable<Integer> bagA = bagStateA.read();
- Iterable<Integer> bagB = bagStateB.read();
- assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
- assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
-
- bagStateA.clear();
- bagStateA.add(1);
- bagStateB.add(0);
- assertThat(bagStateA.read(), containsInAnyOrder(1));
- assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
- }
-
- @Test
- public void testCombiningState() throws Exception {
- Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
- Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
- CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
-
- CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
- StateNamespaces.global(),
- StateTags.combiningValue(
- "state-id-a",
- accumCoder,
- combineFn));
- assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
- combiningState.add(10);
- assertEquals(10, combiningState.read().longValue());
- combiningState.add(1);
- assertEquals(10, combiningState.read().longValue());
- combiningState.add(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
- }
-
- @Test
- public void testWatermarkHoldState() throws Exception {
- WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
- StateNamespaces.global(),
- StateTags.watermarkStateInternal(
- "state-id-a",
- TimestampCombiner.EARLIEST));
- watermarkHoldState.add(new Instant(1));
- assertEquals(1, watermarkHoldState.read().getMillis());
- watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
- assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
- watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
- assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
- }
-
- @Test
- public void testMapState() throws Exception {
- MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
- mapStateA.put(1, 1);
- mapStateA.put(2, 22);
- mapStateA.put(1, 12);
-
- Iterable<Integer> keys = mapStateA.keys().read();
- Iterable<Integer> values = mapStateA.values().read();
- assertThat(keys, containsInAnyOrder(1, 2));
- assertThat(values, containsInAnyOrder(12, 22));
-
- Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
- Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
- Map.Entry<Integer, Integer> entry = itr.next();
- assertEquals((long) entry.getKey(), 1l);
- assertEquals((long) entry.getValue(), 12l);
- entry = itr.next();
- assertEquals((long) entry.getKey(), 2l);
- assertEquals((long) entry.getValue(), 22l);
- assertEquals(false, itr.hasNext());
-
- mapStateA.remove(1);
- keys = mapStateA.keys().read();
- values = mapStateA.values().read();
- assertThat(keys, containsInAnyOrder(2));
- assertThat(values, containsInAnyOrder(22));
-
- entries = mapStateA.entries().read();
- itr = entries.iterator();
- entry = itr.next();
- assertEquals((long) entry.getKey(), 2l);
- assertEquals((long) entry.getValue(), 22l);
- assertEquals(false, itr.hasNext());
- }
-
- @Test
- public void testMassiveDataOfBagState() {
- BagState<Integer> bagStateA = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-
- int count = 10000;
- int n = 1;
- while(n <= count) {
- bagStateA.add(n);
- n++;
- }
-
- int readCount = 0;
- int readN = 0;
- Iterator<Integer> itr = bagStateA.read().iterator();
- while(itr.hasNext()) {
- readN += itr.next();
- readCount++;
- }
-
- assertEquals((long) readN, ((1 + count) * count) / 2);
- assertEquals((long) readCount, count);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
deleted file mode 100644
index 4f69c93..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-@RunWith(JUnit4.class)
-public class CoGroupByKeyTest implements Serializable {
- /**
- * Converts the given list into a PCollection belonging to the provided
- * Pipeline in such a way that coder inference needs to be performed.
- */
- private PCollection<KV<Integer, String>> createInput(String name,
- Pipeline p, List<KV<Integer, String>> list) {
- return createInput(name, p, list, new ArrayList<Long>());
- }
-
- /**
- * Converts the given list with timestamps into a PCollection.
- */
- private PCollection<KV<Integer, String>> createInput(String name,
- Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
- PCollection<KV<Integer, String>> input;
- if (timestamps.isEmpty()) {
- input = p.apply("Create" + name, Create.of(list)
- .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
- } else {
- input = p.apply("Create" + name, Create.timestamped(list, timestamps)
- .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
- }
- return input.apply(
- "Identity" + name,
- ParDo.of(
- new DoFn<KV<Integer, String>, KV<Integer, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }));
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result
- * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
- * where each {@link PCollection} has no duplicate keys and the key sets of
- * each {@link PCollection} are intersecting but neither is a subset of the other.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
- Pipeline p,
- TupleTag<String> tag1,
- TupleTag<String> tag2) {
- List<KV<Integer, String>> list1 =
- Arrays.asList(
- KV.of(1, "collection1-1"),
- KV.of(2, "collection1-2"));
- List<KV<Integer, String>> list2 =
- Arrays.asList(
- KV.of(2, "collection2-2"),
- KV.of(3, "collection2-3"));
- PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
- PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(tag1, collection1)
- .and(tag2, collection2)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testCoGroupByKeyGetOnly() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- final TupleTag<String> tag1 = new TupleTag<>();
- final TupleTag<String> tag2 = new TupleTag<>();
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- buildGetOnlyGbk(p, tag1, tag2);
-
- PAssert.thatMap(coGbkResults).satisfies(
- new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
- @Override
- public Void apply(Map<Integer, CoGbkResult> results) {
- assertEquals("collection1-1", results.get(1).getOnly(tag1));
- assertEquals("collection1-2", results.get(2).getOnly(tag1));
- assertEquals("collection2-2", results.get(2).getOnly(tag2));
- assertEquals("collection2-3", results.get(3).getOnly(tag2));
- return null;
- }
- });
-
- p.run();
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
- * results of the {@code CoGroupByKey} over three
- * {@code PCollection<KV<Integer, String>>}, each of which correlates
- * a customer id to purchases, addresses, or names, respectively.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
- Pipeline p,
- TupleTag<String> purchasesTag,
- TupleTag<String> addressesTag,
- TupleTag<String> namesTag) {
- List<KV<Integer, String>> idToPurchases =
- Arrays.asList(
- KV.of(2, "Boat"),
- KV.of(1, "Shoes"),
- KV.of(3, "Car"),
- KV.of(1, "Book"),
- KV.of(10, "Pens"),
- KV.of(8, "House"),
- KV.of(4, "Suit"),
- KV.of(11, "House"),
- KV.of(14, "Shoes"),
- KV.of(2, "Suit"),
- KV.of(8, "Suit Case"),
- KV.of(3, "House"));
-
- List<KV<Integer, String>> idToAddress =
- Arrays.asList(
- KV.of(2, "53 S. 3rd"),
- KV.of(10, "383 Jackson Street"),
- KV.of(20, "3 W. Arizona"),
- KV.of(3, "29 School Rd"),
- KV.of(8, "6 Watling Rd"));
-
- List<KV<Integer, String>> idToName =
- Arrays.asList(
- KV.of(1, "John Smith"),
- KV.of(2, "Sally James"),
- KV.of(8, "Jeffery Spalding"),
- KV.of(20, "Joan Lichtfield"));
-
- PCollection<KV<Integer, String>> purchasesTable =
- createInput("CreateIdToPurchases", p, idToPurchases);
-
- PCollection<KV<Integer, String>> addressTable =
- createInput("CreateIdToAddress", p, idToAddress);
-
- PCollection<KV<Integer, String>> nameTable =
- createInput("CreateIdToName", p, idToName);
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(namesTag, nameTable)
- .and(addressesTag, addressTable)
- .and(purchasesTag, purchasesTable)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
- * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
- * each of which correlates a customer id to clicks, purchases, respectively.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(
- Pipeline p,
- TupleTag<String> clicksTag,
- TupleTag<String> purchasesTag) {
- List<KV<Integer, String>> idToClick =
- Arrays.asList(
- KV.of(1, "Click t0"),
- KV.of(2, "Click t2"),
- KV.of(1, "Click t4"),
- KV.of(1, "Click t6"),
- KV.of(2, "Click t8"));
-
- List<KV<Integer, String>> idToPurchases =
- Arrays.asList(
- KV.of(1, "Boat t1"),
- KV.of(1, "Shoesi t2"),
- KV.of(1, "Pens t3"),
- KV.of(2, "House t4"),
- KV.of(2, "Suit t5"),
- KV.of(1, "Car t6"),
- KV.of(1, "Book t7"),
- KV.of(2, "House t8"),
- KV.of(2, "Shoes t9"),
- KV.of(2, "House t10"));
-
- PCollection<KV<Integer, String>> clicksTable =
- createInput("CreateClicks",
- p,
- idToClick,
- Arrays.asList(0L, 2L, 4L, 6L, 8L))
- .apply("WindowClicks", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
- PCollection<KV<Integer, String>> purchasesTable =
- createInput("CreatePurchases",
- p,
- idToPurchases,
- Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
- .apply("WindowPurchases", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(clicksTag, clicksTable)
- .and(purchasesTag, purchasesTable)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testCoGroupByKey() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- final TupleTag<String> namesTag = new TupleTag<>();
- final TupleTag<String> addressesTag = new TupleTag<>();
- final TupleTag<String> purchasesTag = new TupleTag<>();
-
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
-
- PAssert.thatMap(coGbkResults).satisfies(
- new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
- @Override
- public Void apply(Map<Integer, CoGbkResult> results) {
- CoGbkResult result1 = results.get(1);
- assertEquals("John Smith", result1.getOnly(namesTag));
- assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book"));
-
- CoGbkResult result2 = results.get(2);
- assertEquals("Sally James", result2.getOnly(namesTag));
- assertEquals("53 S. 3rd", result2.getOnly(addressesTag));
- assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat"));
-
- CoGbkResult result3 = results.get(3);
- assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd");
- assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House"));
-
- CoGbkResult result8 = results.get(8);
- assertEquals("Jeffery Spalding", result8.getOnly(namesTag));
- assertEquals("6 Watling Rd", result8.getOnly(addressesTag));
- assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case"));
-
- CoGbkResult result20 = results.get(20);
- assertEquals("Joan Lichtfield", result20.getOnly(namesTag));
- assertEquals("3 W. Arizona", result20.getOnly(addressesTag));
-
- assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag));
-
- assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit"));
- assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens"));
- assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House"));
- assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes"));
-
- return null;
- }
- });
-
- p.run();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
deleted file mode 100644
index 5ec6636..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link GroupByKey} with {@link com.alibaba.jstorm.beam.StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class GroupByKeyTest {
-
- static final String[] WORDS_ARRAY = new String[] {
- "hi", "there", "hi", "hi", "sue", "bob",
- "hi", "sue", "", "", "ZOW", "bob", "" };
-
- static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
- @Test
- public void testGroupByKey() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
- KV.of("k1", 3),
- KV.of("k5", Integer.MAX_VALUE),
- KV.of("k5", Integer.MIN_VALUE),
- KV.of("k2", 66),
- KV.of("k1", 4),
- KV.of("k2", -33),
- KV.of("k3", 0));
-
- PCollection<KV<String, Integer>> input =
- p.apply(Create.of(ungroupedPairs)
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
- PCollection<KV<String, Iterable<Integer>>> output =
- input.apply(GroupByKey.<String, Integer>create());
-
- PAssert.that(output)
- .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
-
- p.run();
- }
-
- @Test
- public void testCountGloballyBasic() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
- PCollection<String> input = p.apply(Create.of(WORDS));
-
- PCollection<Long> output =
- input.apply(Count.<String>globally());
-
- PAssert.that(output)
- .containsInAnyOrder(13L);
- p.run();
- }
-
- static class AssertThatHasExpectedContentsForTestGroupByKey
- implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
- Void> {
- @Override
- public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
- assertThat(actual, containsInAnyOrder(
- KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)),
- KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
- Integer.MIN_VALUE)),
- KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)),
- KvMatcher.isKv(is("k3"), containsInAnyOrder(0))));
- return null;
- }
- }
-
- /**
- * Matcher for KVs.
- */
- public static class KvMatcher<K, V>
- extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
- final Matcher<? super K> keyMatcher;
- final Matcher<? super V> valueMatcher;
-
- public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
- Matcher<V> valueMatcher) {
- return new KvMatcher<>(keyMatcher, valueMatcher);
- }
-
- public KvMatcher(Matcher<? super K> keyMatcher,
- Matcher<? super V> valueMatcher) {
- this.keyMatcher = keyMatcher;
- this.valueMatcher = valueMatcher;
- }
-
- @Override
- public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
- return keyMatcher.matches(kv.getKey())
- && valueMatcher.matches(kv.getValue());
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a KV(").appendValue(keyMatcher)
- .appendText(", ").appendValue(valueMatcher)
- .appendText(")");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
deleted file mode 100644
index da0aafe..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
+++ /dev/null
@@ -1,626 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import com.google.common.base.MoreObjects;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.*;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.UsesMapState;
-import org.apache.beam.sdk.testing.UsesStatefulParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.values.*;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link ParDo} with {@link com.alibaba.jstorm.beam.StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class ParDoTest implements Serializable {
-
- @Test
- public void testParDo() throws IOException {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- PCollection<String> output = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn()));
-
- PAssert.that(output)
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
- pipeline.run();
- }
-
- @Test
- public void testParDoWithSideInputs() throws IOException {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- PCollectionView<Integer> sideInput1 = pipeline
- .apply("CreateSideInput1", Create.of(11))
- .apply("ViewSideInput1", View.<Integer>asSingleton());
- PCollectionView<Integer> sideInputUnread = pipeline
- .apply("CreateSideInputUnread", Create.of(-3333))
- .apply("ViewSideInputUnread", View.<Integer>asSingleton());
-
- PCollectionView<Integer> sideInput2 = pipeline
- .apply("CreateSideInput2", Create.of(222))
- .apply("ViewSideInput2", View.<Integer>asSingleton());
- PCollection<String> output = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn(
- Arrays.asList(sideInput1, sideInput2),
- Arrays.<TupleTag<String>>asList()))
- .withSideInputs(sideInput1, sideInputUnread, sideInput2));
-
- PAssert.that(output)
- .satisfies(ParDoTest.HasExpectedOutput
- .forInput(inputs)
- .andSideInputs(11, 222));
-
- pipeline.run();
- }
-
- @Test
- public void testParDoWithTaggedOutput() {
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
- TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
- TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
- TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
-
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- PCollectionTuple outputs = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo
- .of(new TestDoFn(
- Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
- .withOutputTags(
- mainOutputTag,
- TupleTagList.of(additionalOutputTag3)
- .and(additionalOutputTag1)
- .and(additionalOutputTagUnwritten)
- .and(additionalOutputTag2)));
-
- PAssert.that(outputs.get(mainOutputTag))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
- PAssert.that(outputs.get(additionalOutputTag1))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag1));
- PAssert.that(outputs.get(additionalOutputTag2))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag2));
- PAssert.that(outputs.get(additionalOutputTag3))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag3));
- PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
-
- pipeline.run();
- }
-
- @Test
- public void testNoWindowFnDoesNotReassignWindows() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final PCollection<Long> initialWindows =
- pipeline
- .apply(GenerateSequence.from(0).to(10))
- .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
-
- // Sanity check the window assignment to demonstrate the baseline
- PAssert.that(initialWindows)
- .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
- .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
- PAssert.that(initialWindows)
- .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
- .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
-
- PCollection<Boolean> upOne =
- initialWindows.apply(
- "ModifyTypes",
- MapElements.<Long, Boolean>via(
- new SimpleFunction<Long, Boolean>() {
- @Override
- public Boolean apply(Long input) {
- return input % 2 == 0;
- }
- }));
- PAssert.that(upOne)
- .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
- .containsInAnyOrder(true, true, true, true, true);
- PAssert.that(upOne)
- .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
- .containsInAnyOrder(false, false, false, false, false);
-
- // The elements should be in the same windows, even though they would not be assigned to the
- // same windows with the updated timestamps. If we try to apply the original WindowFn, the type
- // will not be appropriate and the runner should crash, as a Boolean cannot be converted into
- // a long.
- PCollection<Boolean> updatedTrigger =
- upOne.apply(
- "UpdateWindowingStrategy",
- Window.<Boolean>configure().triggering(Never.ever())
- .withAllowedLateness(Duration.ZERO)
- .accumulatingFiredPanes());
- pipeline.run();
- }
-
- @Test
- public void testValueStateSameId() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
-
- DoFn<KV<String, Integer>, KV<String, Integer>> fn =
- new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
- c.output(KV.of("sizzle", currentValue));
- state.write(currentValue + 1);
- }
- };
-
- DoFn<KV<String, Integer>, Integer> fn2 =
- new DoFn<KV<String, Integer>, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
- c.output(currentValue);
- state.write(currentValue + 13);
- }
- };
-
- PCollection<KV<String, Integer>> intermediate =
- pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
- .apply("First stateful ParDo", ParDo.of(fn));
-
- PCollection<Integer> output =
- intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
-
- PAssert.that(intermediate)
- .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2));
- PAssert.that(output).containsInAnyOrder(13, 26, 39);
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
- public void testValueStateTaggedOutput() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
-
- final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
- final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
-
- DoFn<KV<String, Integer>, Integer> fn =
- new DoFn<KV<String, Integer>, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
- if (currentValue % 2 == 0) {
- c.output(currentValue);
- } else {
- c.output(oddTag, currentValue);
- }
- state.write(currentValue + 1);
- }
- };
-
- PCollectionTuple output =
- pipeline.apply(
- Create.of(
- KV.of("hello", 42),
- KV.of("hello", 97),
- KV.of("hello", 84),
- KV.of("goodbye", 33),
- KV.of("hello", 859),
- KV.of("goodbye", 83945)))
- .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
-
- PCollection<Integer> evens = output.get(evenTag);
- PCollection<Integer> odds = output.get(oddTag);
-
- // There are 0 and 2 from "hello" and just 0 from "goodbye"
- PAssert.that(evens).containsInAnyOrder(0, 2, 0);
-
- // There are 1 and 3 from "hello" and just "1" from "goodbye"
- PAssert.that(odds).containsInAnyOrder(1, 3, 1);
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
- public void testMapStateCoderInference() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
- final String countStateId = "count";
- Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
-
- DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
- new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
-
- @StateId(stateId)
- private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
-
- @StateId(countStateId)
- private final StateSpec<CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
- Sum.ofIntegers());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) CombiningState<Integer, int[], Integer>
- count) {
- KV<String, Integer> value = c.element().getValue();
- state.put(value.getKey(), new MyInteger(value.getValue()));
- count.add(1);
- if (count.read() >= 4) {
- Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
- for (Map.Entry<String, MyInteger> entry : iterate) {
- c.output(KV.of(entry.getKey(), entry.getValue()));
- }
- }
- }
- };
-
- PCollection<KV<String, MyInteger>> output =
- pipeline.apply(
- Create.of(
- KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)),
- KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12))))
- .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder));
-
- PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)),
- KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12)));
- pipeline.run();
- }
-
-
- private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
- private static final IntervalWindow EVEN_WINDOW =
- new IntervalWindow(
- BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
- private static final IntervalWindow ODD_WINDOW =
- new IntervalWindow(
- BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1));
-
- @Override
- public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
- if (c.element() % 2 == 0) {
- return Collections.singleton(EVEN_WINDOW);
- }
- return Collections.singleton(ODD_WINDOW);
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- return other instanceof WindowOddEvenBuckets;
- }
-
- @Override
- public Coder<IntervalWindow> windowCoder() {
- return new IntervalWindow.IntervalWindowCoder();
- }
-
- @Override
- public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
- throw new UnsupportedOperationException(
- String.format("Can't use %s for side inputs", getClass().getSimpleName()));
- }
- }
-
-
- static class TestDoFn extends DoFn<Integer, String> {
- enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED}
-
- State state = State.NOT_SET_UP;
-
- final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
- final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
-
- public TestDoFn() {
- }
-
- public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
- List<TupleTag<String>> additionalOutputTupleTags) {
- this.sideInputViews.addAll(sideInputViews);
- this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
- }
-
- @Setup
- public void prepare() {
- assertEquals(State.NOT_SET_UP, state);
- state = State.UNSTARTED;
- }
-
- @StartBundle
- public void startBundle() {
- assertThat(state,
- anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
-
- state = State.STARTED;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- System.out.println("Recv elem: " + c.element());
- assertThat(state,
- anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
- state = State.PROCESSING;
- outputToAllWithSideInputs(c, "processing: " + c.element());
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext c) {
- assertThat(state,
- anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
- state = State.FINISHED;
- c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
- for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
- c.output(
- additionalOutputTupleTag,
- additionalOutputTupleTag.getId() + ": " + "finished",
- BoundedWindow.TIMESTAMP_MIN_VALUE,
- GlobalWindow.INSTANCE);
- }
- }
-
- private void outputToAllWithSideInputs(ProcessContext c, String value) {
- if (!sideInputViews.isEmpty()) {
- List<Integer> sideInputValues = new ArrayList<>();
- for (PCollectionView<Integer> sideInputView : sideInputViews) {
- sideInputValues.add(c.sideInput(sideInputView));
- }
- value += ": " + sideInputValues;
- }
- c.output(value);
- for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
- c.output(additionalOutputTupleTag,
- additionalOutputTupleTag.getId() + ": " + value);
- }
- }
- }
-
- private static class MyInteger implements Comparable<MyInteger> {
- private final int value;
-
- MyInteger(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof MyInteger)) {
- return false;
- }
-
- MyInteger myInteger = (MyInteger) o;
-
- return value == myInteger.value;
-
- }
-
- @Override
- public int hashCode() {
- return value;
- }
-
- @Override
- public int compareTo(MyInteger o) {
- return Integer.compare(this.getValue(), o.getValue());
- }
-
- @Override
- public String toString() {
- return "MyInteger{" + "value=" + value + '}';
- }
- }
-
- private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
- private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
-
- private final VarIntCoder delegate = VarIntCoder.of();
-
- public static MyIntegerCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(MyInteger value, OutputStream outStream)
- throws CoderException, IOException {
- delegate.encode(value.getValue(), outStream);
- }
-
- @Override
- public MyInteger decode(InputStream inStream) throws CoderException,
- IOException {
- return new MyInteger(delegate.decode(inStream));
- }
- }
-
- /** PAssert "matcher" for expected output. */
- static class HasExpectedOutput
- implements SerializableFunction<Iterable<String>, Void>, Serializable {
- private final List<Integer> inputs;
- private final List<Integer> sideInputs;
- private final String additionalOutput;
- private final boolean ordered;
-
- public static HasExpectedOutput forInput(List<Integer> inputs) {
- return new HasExpectedOutput(
- new ArrayList<Integer>(inputs),
- new ArrayList<Integer>(),
- "",
- false);
- }
-
- private HasExpectedOutput(List<Integer> inputs,
- List<Integer> sideInputs,
- String additionalOutput,
- boolean ordered) {
- this.inputs = inputs;
- this.sideInputs = sideInputs;
- this.additionalOutput = additionalOutput;
- this.ordered = ordered;
- }
-
- public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
- List<Integer> sideInputs = new ArrayList<>();
- for (Integer sideInputValue : sideInputValues) {
- sideInputs.add(sideInputValue);
- }
- return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered);
- }
-
- public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
- return fromOutput(outputTag.getId());
- }
- public HasExpectedOutput fromOutput(String outputId) {
- return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
- }
-
- public HasExpectedOutput inOrder() {
- return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
- }
-
- @Override
- public Void apply(Iterable<String> outputs) {
- List<String> processeds = new ArrayList<>();
- List<String> finisheds = new ArrayList<>();
- for (String output : outputs) {
- if (output.contains("finished")) {
- finisheds.add(output);
- } else {
- processeds.add(output);
- }
- }
-
- String sideInputsSuffix;
- if (sideInputs.isEmpty()) {
- sideInputsSuffix = "";
- } else {
- sideInputsSuffix = ": " + sideInputs;
- }
-
- String additionalOutputPrefix;
- if (additionalOutput.isEmpty()) {
- additionalOutputPrefix = "";
- } else {
- additionalOutputPrefix = additionalOutput + ": ";
- }
-
- List<String> expectedProcesseds = new ArrayList<>();
- for (Integer input : inputs) {
- expectedProcesseds.add(
- additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
- }
- String[] expectedProcessedsArray =
- expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
- if (!ordered || expectedProcesseds.isEmpty()) {
- assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
- } else {
- assertThat(processeds, contains(expectedProcessedsArray));
- }
-
- for (String finished : finisheds) {
- assertEquals(additionalOutputPrefix + "finished", finished);
- }
-
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
new file mode 100644
index 0000000..11c7c94
--- /dev/null
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
+import com.alibaba.jstorm.utils.KryoSerializer;
+
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JStormStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class JStormStateInternalsTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ private JStormStateInternals<String> jstormStateInternals;
+
+ @Before
+ public void setup() throws Exception {
+ IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
+ Maps.newHashMap(),
+ "test",
+ tmp.toString(),
+ new KryoSerializer(Maps.newHashMap()));
+ jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0);
+ }
+
+ @Test
+ public void testValueState() throws Exception {
+ ValueState<Integer> valueState = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+ valueState.write(Integer.MIN_VALUE);
+ assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+ valueState.write(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+ }
+
+ @Test
+ public void testValueStateIdenticalId() throws Exception {
+ ValueState<Integer> valueState = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+ ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+
+ valueState.write(Integer.MIN_VALUE);
+ assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+ assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
+ valueState.write(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+ assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+ }
+
+ @Test
+ public void testBagState() throws Exception {
+ BagState<Integer> bagStateA = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+ BagState<Integer> bagStateB = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
+
+ bagStateA.add(1);
+ bagStateA.add(0);
+ bagStateA.add(Integer.MAX_VALUE);
+
+ bagStateB.add(0);
+ bagStateB.add(Integer.MIN_VALUE);
+
+ Iterable<Integer> bagA = bagStateA.read();
+ Iterable<Integer> bagB = bagStateB.read();
+ assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
+ assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
+
+ bagStateA.clear();
+ bagStateA.add(1);
+ bagStateB.add(0);
+ assertThat(bagStateA.read(), containsInAnyOrder(1));
+ assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+ }
+
+ @Test
+ public void testCombiningState() throws Exception {
+ Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
+ Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
+ CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
+
+ CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
+ StateNamespaces.global(),
+ StateTags.combiningValue(
+ "state-id-a",
+ accumCoder,
+ combineFn));
+ assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
+ combiningState.add(10);
+ assertEquals(10, combiningState.read().longValue());
+ combiningState.add(1);
+ assertEquals(10, combiningState.read().longValue());
+ combiningState.add(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
+ }
+
+ @Test
+ public void testWatermarkHoldState() throws Exception {
+ WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
+ StateNamespaces.global(),
+ StateTags.watermarkStateInternal(
+ "state-id-a",
+ TimestampCombiner.EARLIEST));
+ watermarkHoldState.add(new Instant(1));
+ assertEquals(1, watermarkHoldState.read().getMillis());
+ watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
+ assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+ watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
+ assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+ }
+
+ @Test
+ public void testMapState() throws Exception {
+ MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
+ mapStateA.put(1, 1);
+ mapStateA.put(2, 22);
+ mapStateA.put(1, 12);
+
+ Iterable<Integer> keys = mapStateA.keys().read();
+ Iterable<Integer> values = mapStateA.values().read();
+ assertThat(keys, containsInAnyOrder(1, 2));
+ assertThat(values, containsInAnyOrder(12, 22));
+
+ Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
+ Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
+ Map.Entry<Integer, Integer> entry = itr.next();
+ assertEquals((long) entry.getKey(), 1l);
+ assertEquals((long) entry.getValue(), 12l);
+ entry = itr.next();
+ assertEquals((long) entry.getKey(), 2l);
+ assertEquals((long) entry.getValue(), 22l);
+ assertEquals(false, itr.hasNext());
+
+ mapStateA.remove(1);
+ keys = mapStateA.keys().read();
+ values = mapStateA.values().read();
+ assertThat(keys, containsInAnyOrder(2));
+ assertThat(values, containsInAnyOrder(22));
+
+ entries = mapStateA.entries().read();
+ itr = entries.iterator();
+ entry = itr.next();
+ assertEquals((long) entry.getKey(), 2l);
+ assertEquals((long) entry.getValue(), 22l);
+ assertEquals(false, itr.hasNext());
+ }
+
+ @Test
+ public void testMassiveDataOfBagState() {
+ BagState<Integer> bagStateA = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+
+ int count = 10000;
+ int n = 1;
+ while(n <= count) {
+ bagStateA.add(n);
+ n++;
+ }
+
+ int readCount = 0;
+ int readN = 0;
+ Iterator<Integer> itr = bagStateA.read().iterator();
+ while(itr.hasNext()) {
+ readN += itr.next();
+ readCount++;
+ }
+
+ assertEquals((long) readN, ((1 + count) * count) / 2);
+ assertEquals((long) readCount, count);
+ }
+}