You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:36 UTC

[36/53] [abbrv] beam git commit: jstorm-runner: Fix the failure of session window test cases in CombineTest

jstorm-runner:  Fix the failure of session window test cases in CombineTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52913b7e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52913b7e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52913b7e

Branch: refs/heads/jstorm-runner
Commit: 52913b7e2b01b4e6c65d96a10d745dd3e6739c83
Parents: 201ef72
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Jul 20 14:37:29 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800

----------------------------------------------------------------------
 .../jstorm/translation/FlattenTranslator.java   |   1 -
 .../translation/JStormStateInternals.java       | 188 +++++++++++++++++--
 2 files changed, 176 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index 8f239bf..e104ad8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 68a17e5..90ef6d2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateBinder;
 import org.apache.beam.sdk.state.StateContext;
 import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Combine;
@@ -93,13 +94,14 @@ class JStormStateInternals<K> implements StateInternals {
   }
 
   @Override
-  public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+  public <T extends State> T state(final StateNamespace namespace, final StateTag<T> address) {
     return address.getSpec().bind(address.getId(), new StateBinder() {
       @Override
       public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
         try {
           return new JStormValueState<>(
-              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+              getStoreId(id), spec, getKey(), namespace,
+              kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
         } catch (IOException e) {
           throw new RuntimeException();
         }
@@ -109,7 +111,8 @@ class JStormStateInternals<K> implements StateInternals {
       public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
         try {
           return new JStormBagState(
-              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+              getStoreId(id), spec, getKey(), namespace,
+              kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
               kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
         } catch (IOException e) {
           throw new RuntimeException();
@@ -129,7 +132,8 @@ class JStormStateInternals<K> implements StateInternals {
           Coder<ValueT> mapValueCoder) {
         try {
           return new JStormMapState<>(
-              getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+              getStoreId(id), spec, (KeyT) getKey(), namespace,
+              kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -143,10 +147,11 @@ class JStormStateInternals<K> implements StateInternals {
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
         try {
           BagState<AccumT> accumBagState = new JStormBagState(
-              getKey(), namespace,
+              getStoreId(id), StateSpecs.<InputT>bag(), getKey(), namespace,
               kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
               kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-          return new JStormCombiningState<>(accumBagState, combineFn);
+          return new JStormCombiningState<>(
+              id, spec, namespace, accumBagState, combineFn);
         } catch (IOException e) {
           throw new RuntimeException();
         }
@@ -169,7 +174,7 @@ class JStormStateInternals<K> implements StateInternals {
           final TimestampCombiner timestampCombiner) {
         try {
           BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
-              getKey(), namespace,
+              getStoreId(id), StateSpecs.<Combine.Holder<Instant>>bag(), getKey(), namespace,
               kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
               kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
 
@@ -181,8 +186,11 @@ class JStormStateInternals<K> implements StateInternals {
                 }
               };
           return new JStormWatermarkHoldState(
-              namespace,
+              id, spec, namespace,
               new JStormCombiningState<>(
+                  getStoreId(id),
+                  StateSpecs.combining(outputTimeCombineFn),
+                  namespace,
                   accumBagState,
                   outputTimeCombineFn),
               timestampCombiner,
@@ -199,12 +207,21 @@ class JStormStateInternals<K> implements StateInternals {
    */
   private static class JStormValueState<K, T> implements ValueState<T> {
 
+    private final String id;
+    private final StateSpec<ValueState<T>> spec;
     @Nullable
     private final K key;
     private final StateNamespace namespace;
     private final IKvStore<ComposedKey, T> kvState;
 
-    JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+    JStormValueState(
+            String id,
+            StateSpec<ValueState<T>> spec,
+            @Nullable K key,
+            StateNamespace namespace,
+            IKvStore<ComposedKey, T> kvState) {
+      this.id = id;
+      this.spec = spec;
       this.key = key;
       this.namespace = namespace;
       this.kvState = kvState;
@@ -246,6 +263,29 @@ class JStormStateInternals<K> implements StateInternals {
       }
     }
 
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      JStormValueState<?, ?> that = (JStormValueState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + id.hashCode();
+      result = 31 * result + spec.hashCode();
+      return result;
+    }
+
     private ComposedKey getComposedKey() {
       return ComposedKey.of(key, namespace);
     }
@@ -256,6 +296,8 @@ class JStormStateInternals<K> implements StateInternals {
    */
   private static class JStormBagState<K, T> implements BagState<T> {
 
+    private final String id;
+    private final StateSpec<BagState<T>> spec;
     @Nullable
     private final K key;
     private final StateNamespace namespace;
@@ -263,10 +305,14 @@ class JStormStateInternals<K> implements StateInternals {
     private final IKvStore<ComposedKey, Object> stateInfoKvState;
 
     JStormBagState(
+        String id,
+        StateSpec<BagState<T>> spec,
         @Nullable K key,
         StateNamespace namespace,
         IKvStore<ComposedKey, T> kvState,
         IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+      this.id = id;
+      this.spec = spec;
       this.key = key;
       this.namespace = checkNotNull(namespace, "namespace");
       this.kvState = checkNotNull(kvState, "kvState");
@@ -350,8 +396,31 @@ class JStormStateInternals<K> implements StateInternals {
       } catch (IOException e) {
 
       }
-      return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d",
-              key, namespace, elemIndex);
+      return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d",
+              id, key, namespace, elemIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      JStormBagState<?, ?> that = (JStormBagState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + id.hashCode();
+      result = 31 * result + spec.hashCode();
+      return result;
     }
 
     /**
@@ -420,13 +489,22 @@ class JStormStateInternals<K> implements StateInternals {
   private static class JStormCombiningState<InputT, AccumT, OutputT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
+    private final String id;
+    private final StateSpec<CombiningState<InputT, AccumT, OutputT>> spec;
+    private final StateNamespace namespace;
     @Nullable
     private final BagState<AccumT> accumBagState;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
     JStormCombiningState(
+        String id,
+        StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+        StateNamespace namespace,
         BagState<AccumT> accumBagState,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+      this.id = id;
+      this.spec = spec;
+      this.namespace = namespace;
       this.accumBagState = checkNotNull(accumBagState, "accumBagState");
       this.combineFn = checkNotNull(combineFn, "combineFn");
     }
@@ -474,6 +552,29 @@ class JStormStateInternals<K> implements StateInternals {
     public void clear() {
       accumBagState.clear();
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      JStormCombiningState<?, ?, ?> that = (JStormCombiningState<?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + id.hashCode();
+      result = 31 * result + spec.hashCode();
+      return result;
+    }
   }
 
   /**
@@ -483,11 +584,19 @@ class JStormStateInternals<K> implements StateInternals {
    */
   private static class JStormMapState<K, V> implements MapState<K, V> {
 
+    private final String id;
+    private final StateSpec<MapState<K, V>> spec;
     private final K key;
     private final StateNamespace namespace;
     private IKvStore<K, V> kvStore;
 
-    JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+    JStormMapState(
+        String id,
+        StateSpec<MapState<K, V>> spec,
+        K key,
+        StateNamespace namespace, IKvStore<K, V> kvStore) {
+      this.id = id;
+      this.spec = spec;
       this.key = key;
       this.namespace = namespace;
       this.kvStore = kvStore;
@@ -582,6 +691,29 @@ class JStormStateInternals<K> implements StateInternals {
       }
     }
 
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      JStormMapState<?, ?> that = (JStormMapState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + id.hashCode();
+      result = 31 * result + spec.hashCode();
+      return result;
+    }
+
     private void reportError(String errorInfo, IOException e) {
       LOG.error(errorInfo, e);
       throw new RuntimeException(errorInfo);
@@ -611,16 +743,22 @@ class JStormStateInternals<K> implements StateInternals {
    */
   private static class JStormWatermarkHoldState implements WatermarkHoldState {
 
+    private final String id;
+    private final StateSpec<WatermarkHoldState> spec;
     private final StateNamespace namespace;
     private final GroupingState<Instant, Instant> watermarkHoldsState;
     private final TimestampCombiner timestampCombiner;
     private final TimerService timerService;
 
     JStormWatermarkHoldState(
+        String id,
+        StateSpec<WatermarkHoldState> spec,
         StateNamespace namespace,
         GroupingState<Instant, Instant> watermarkHoldsState,
         TimestampCombiner timestampCombiner,
         TimerService timerService) {
+      this.id = checkNotNull(id, "id");
+      this.spec = checkNotNull(spec, "spec");
       this.namespace = checkNotNull(namespace, "namespace");
       this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
       this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
@@ -659,6 +797,32 @@ class JStormStateInternals<K> implements StateInternals {
       timerService.clearWatermarkHold(namespace.stringKey());
       watermarkHoldsState.clear();
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      JStormWatermarkHoldState that = (JStormWatermarkHoldState) o;
+
+      return namespace.equals(that.namespace)
+          && id.equals(that.id)
+          && spec.equals(that.spec)
+          && timestampCombiner.equals(that.timestampCombiner);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + id.hashCode();
+      result = 31 * result + spec.hashCode();
+      result = 31 * result + timestampCombiner.hashCode();
+      return result;
+    }
   }
 
   private String getStoreId(String stateId) {