You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:10 UTC

[10/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
index 11c7c94..2a8160c 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,18 +17,27 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime.state;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
 import avro.shaded.com.google.common.collect.Maps;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
 import com.alibaba.jstorm.utils.KryoSerializer;
-
+import java.util.Iterator;
+import java.util.Map;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -40,180 +49,174 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
 /**
  * Tests for {@link JStormStateInternals}.
  */
 @RunWith(JUnit4.class)
 public class JStormStateInternalsTest {
 
-    @Rule
-    public final TemporaryFolder tmp = new TemporaryFolder();
-
-    private JStormStateInternals<String> jstormStateInternals;
-
-    @Before
-    public void setup() throws Exception {
-        IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
-                Maps.newHashMap(),
-                "test",
-                tmp.toString(),
-                new KryoSerializer(Maps.newHashMap()));
-        jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0);
-    }
-
-    @Test
-    public void testValueState() throws Exception {
-        ValueState<Integer> valueState = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-        valueState.write(Integer.MIN_VALUE);
-        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-        valueState.write(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-    }
-
-    @Test
-    public void testValueStateIdenticalId() throws Exception {
-        ValueState<Integer> valueState = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-        ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-
-        valueState.write(Integer.MIN_VALUE);
-        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-        assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
-        valueState.write(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-        assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+  @Rule
+  public final TemporaryFolder tmp = new TemporaryFolder();
+
+  private JStormStateInternals<String> jstormStateInternals;
+
+  @Before
+  public void setup() throws Exception {
+    IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
+        Maps.newHashMap(),
+        "test",
+        tmp.toString(),
+        new KryoSerializer(Maps.newHashMap()));
+    jstormStateInternals = new JStormStateInternals(
+        "key-1", kvStoreManager, new TimerServiceImpl(), 0);
+  }
+
+  @Test
+  public void testValueState() throws Exception {
+    ValueState<Integer> valueState = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+    valueState.write(Integer.MIN_VALUE);
+    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+    valueState.write(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+  }
+
+  @Test
+  public void testValueStateIdenticalId() throws Exception {
+    ValueState<Integer> valueState = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+    ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+
+    valueState.write(Integer.MIN_VALUE);
+    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+    assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
+    valueState.write(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+    assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+  }
+
+  @Test
+  public void testBagState() throws Exception {
+    BagState<Integer> bagStateA = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+    BagState<Integer> bagStateB = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
+
+    bagStateA.add(1);
+    bagStateA.add(0);
+    bagStateA.add(Integer.MAX_VALUE);
+
+    bagStateB.add(0);
+    bagStateB.add(Integer.MIN_VALUE);
+
+    Iterable<Integer> bagA = bagStateA.read();
+    Iterable<Integer> bagB = bagStateB.read();
+    assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
+    assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
+
+    bagStateA.clear();
+    bagStateA.add(1);
+    bagStateB.add(0);
+    assertThat(bagStateA.read(), containsInAnyOrder(1));
+    assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+  }
+
+  @Test
+  public void testCombiningState() throws Exception {
+    Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
+    Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
+        CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
+
+    CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.combiningValue(
+            "state-id-a",
+            accumCoder,
+            combineFn));
+    assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
+    combiningState.add(10);
+    assertEquals(10, combiningState.read().longValue());
+    combiningState.add(1);
+    assertEquals(10, combiningState.read().longValue());
+    combiningState.add(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
+  }
+
+  @Test
+  public void testWatermarkHoldState() throws Exception {
+    WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.watermarkStateInternal(
+            "state-id-a",
+            TimestampCombiner.EARLIEST));
+    watermarkHoldState.add(new Instant(1));
+    assertEquals(1, watermarkHoldState.read().getMillis());
+    watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
+    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+    watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
+    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+  }
+
+  @Test
+  public void testMapState() throws Exception {
+    MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
+    mapStateA.put(1, 1);
+    mapStateA.put(2, 22);
+    mapStateA.put(1, 12);
+
+    Iterable<Integer> keys = mapStateA.keys().read();
+    Iterable<Integer> values = mapStateA.values().read();
+    assertThat(keys, containsInAnyOrder(1, 2));
+    assertThat(values, containsInAnyOrder(12, 22));
+
+    Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
+    Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
+    Map.Entry<Integer, Integer> entry = itr.next();
+    assertEquals((long) entry.getKey(), 1l);
+    assertEquals((long) entry.getValue(), 12l);
+    entry = itr.next();
+    assertEquals((long) entry.getKey(), 2l);
+    assertEquals((long) entry.getValue(), 22l);
+    assertEquals(false, itr.hasNext());
+
+    mapStateA.remove(1);
+    keys = mapStateA.keys().read();
+    values = mapStateA.values().read();
+    assertThat(keys, containsInAnyOrder(2));
+    assertThat(values, containsInAnyOrder(22));
+
+    entries = mapStateA.entries().read();
+    itr = entries.iterator();
+    entry = itr.next();
+    assertEquals((long) entry.getKey(), 2l);
+    assertEquals((long) entry.getValue(), 22l);
+    assertEquals(false, itr.hasNext());
+  }
+
+  @Test
+  public void testMassiveDataOfBagState() {
+    BagState<Integer> bagStateA = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+
+    int count = 10000;
+    int n = 1;
+    while (n <= count) {
+      bagStateA.add(n);
+      n++;
     }
 
-    @Test
-    public void testBagState() throws Exception {
-        BagState<Integer> bagStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-        BagState<Integer> bagStateB = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
-
-        bagStateA.add(1);
-        bagStateA.add(0);
-        bagStateA.add(Integer.MAX_VALUE);
-
-        bagStateB.add(0);
-        bagStateB.add(Integer.MIN_VALUE);
-
-        Iterable<Integer> bagA = bagStateA.read();
-        Iterable<Integer> bagB = bagStateB.read();
-        assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
-        assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
-
-        bagStateA.clear();
-        bagStateA.add(1);
-        bagStateB.add(0);
-        assertThat(bagStateA.read(), containsInAnyOrder(1));
-        assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+    int readCount = 0;
+    int readN = 0;
+    Iterator<Integer> itr = bagStateA.read().iterator();
+    while (itr.hasNext()) {
+      readN += itr.next();
+      readCount++;
     }
 
-    @Test
-    public void testCombiningState() throws Exception {
-        Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
-        Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
-            CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
-
-        CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
-                StateNamespaces.global(),
-                StateTags.combiningValue(
-                        "state-id-a",
-                        accumCoder,
-                        combineFn));
-        assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
-        combiningState.add(10);
-        assertEquals(10, combiningState.read().longValue());
-        combiningState.add(1);
-        assertEquals(10, combiningState.read().longValue());
-        combiningState.add(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
-    }
-
-    @Test
-    public void testWatermarkHoldState() throws Exception {
-        WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
-                StateNamespaces.global(),
-                StateTags.watermarkStateInternal(
-                        "state-id-a",
-                        TimestampCombiner.EARLIEST));
-        watermarkHoldState.add(new Instant(1));
-        assertEquals(1, watermarkHoldState.read().getMillis());
-        watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
-        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-        watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
-        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-    }
-
-    @Test
-    public void testMapState() throws Exception {
-        MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
-        mapStateA.put(1, 1);
-        mapStateA.put(2, 22);
-        mapStateA.put(1, 12);
-
-        Iterable<Integer> keys = mapStateA.keys().read();
-        Iterable<Integer> values = mapStateA.values().read();
-        assertThat(keys, containsInAnyOrder(1, 2));
-        assertThat(values, containsInAnyOrder(12, 22));
-
-        Iterable<Map.Entry<Integer, Integer>> entries =  mapStateA.entries().read();
-        Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
-        Map.Entry<Integer, Integer> entry = itr.next();
-        assertEquals((long) entry.getKey(), 1l);
-        assertEquals((long) entry.getValue(), 12l);
-        entry = itr.next();
-        assertEquals((long) entry.getKey(), 2l);
-        assertEquals((long) entry.getValue(), 22l);
-        assertEquals(false, itr.hasNext());
-
-        mapStateA.remove(1);
-        keys = mapStateA.keys().read();
-        values = mapStateA.values().read();
-        assertThat(keys, containsInAnyOrder(2));
-        assertThat(values, containsInAnyOrder(22));
-
-        entries =  mapStateA.entries().read();
-        itr = entries.iterator();
-        entry = itr.next();
-        assertEquals((long) entry.getKey(), 2l);
-        assertEquals((long) entry.getValue(), 22l);
-        assertEquals(false, itr.hasNext());
-    }
-
-    @Test
-    public void testMassiveDataOfBagState() {
-        BagState<Integer> bagStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-
-        int count = 10000;
-        int n = 1;
-        while(n <= count) {
-            bagStateA.add(n);
-            n++;
-        }
-
-        int readCount = 0;
-        int readN = 0;
-        Iterator<Integer> itr = bagStateA.read().iterator();
-        while(itr.hasNext()) {
-            readN += itr.next();
-            readCount++;
-        }
-
-        assertEquals((long) readN, ((1 + count) * count) / 2);
-        assertEquals((long) readCount, count);
-    }
+    assertEquals((long) readN, ((1 + count) * count) / 2);
+    assertEquals((long) readCount, count);
+  }
 }