You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/06 22:08:50 UTC

[1/2] incubator-beam git commit: Cleanup: move toFnWithContext() to CombineFnUtil

Repository: incubator-beam
Updated Branches:
  refs/heads/master e3f2d9564 -> 15a8334f9


Cleanup: move toFnWithContext() to CombineFnUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d9dee3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d9dee3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d9dee3e

Branch: refs/heads/master
Commit: 0d9dee3e1028a96678b3108eff9ae9e650424c02
Parents: e3f2d95
Author: Pei He <pe...@gmail.com>
Authored: Thu Mar 17 15:14:33 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Apr 6 12:54:04 2016 -0700

----------------------------------------------------------------------
 .../dataflow/sdk/transforms/CombineFns.java     | 102 +------------------
 .../cloud/dataflow/sdk/util/CombineFnUtil.java  |  51 +++++++++-
 .../dataflow/sdk/util/CombineFnUtilTest.java    |  40 ++++++++
 3 files changed, 95 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d9dee3e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
index 7af5292..8120de3 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
@@ -31,6 +31,7 @@ import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import com.google.cloud.dataflow.sdk.util.CombineFnUtil;
 import com.google.cloud.dataflow.sdk.util.PropertyNames;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.collect.ImmutableList;
@@ -371,7 +372,7 @@ public class CombineFns {
       checkUniqueness(outputTags, outputTag);
       List<CombineFnWithContext<Object, Object, Object>> fnsWithContext = Lists.newArrayList();
       for (CombineFn<Object, Object, Object> fn : combineFns) {
-        fnsWithContext.add(toFnWithContext(fn));
+        fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
       }
       return new ComposedCombineFnWithContext<>(
           ImmutableList.<SerializableFunction<DataT, ?>>builder()
@@ -512,7 +513,7 @@ public class CombineFns {
               .build(),
           ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
               .addAll(combineFnWithContexts)
-              .add(toFnWithContext(globalCombineFn))
+              .add(CombineFnUtil.toFnWithContext(globalCombineFn))
               .build(),
           ImmutableList.<TupleTag<?>>builder()
               .addAll(outputTags)
@@ -662,7 +663,7 @@ public class CombineFns {
       List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
           Lists.newArrayList();
       for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
-        fnsWithContext.add(toFnWithContext(fn));
+        fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
       }
       return new ComposedKeyedCombineFnWithContext<>(
           ImmutableList.<SerializableFunction<DataT, ?>>builder()
@@ -826,7 +827,7 @@ public class CombineFns {
               .build(),
           ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
               .addAll(keyedCombineFns)
-              .add(toFnWithContext(perKeyCombineFn))
+              .add(CombineFnUtil.toFnWithContext(perKeyCombineFn))
               .build(),
           ImmutableList.<TupleTag<?>>builder()
               .addAll(outputTags)
@@ -999,99 +1000,6 @@ public class CombineFns {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private static <InputT, AccumT, OutputT> CombineFnWithContext<InputT, AccumT, OutputT>
-  toFnWithContext(GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
-    if (globalCombineFn instanceof CombineFnWithContext) {
-      return (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
-    } else {
-      final CombineFn<InputT, AccumT, OutputT> combineFn =
-          (CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
-      return new CombineFnWithContext<InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(Context c) {
-          return combineFn.createAccumulator();
-        }
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input, Context c) {
-          return combineFn.addInput(accumulator, input);
-        }
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
-          return combineFn.mergeAccumulators(accumulators);
-        }
-        @Override
-        public OutputT extractOutput(AccumT accumulator, Context c) {
-          return combineFn.extractOutput(accumulator);
-        }
-        @Override
-        public AccumT compact(AccumT accumulator, Context c) {
-          return combineFn.compact(accumulator);
-        }
-        @Override
-        public OutputT defaultValue() {
-          return combineFn.defaultValue();
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return combineFn.getAccumulatorCoder(registry, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return combineFn.getDefaultOutputCoder(registry, inputCoder);
-        }
-      };
-    }
-  }
-
-  private static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
-  toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      @SuppressWarnings("unchecked")
-      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return keyedCombineFnWithContext;
-    } else {
-      @SuppressWarnings("unchecked")
-      final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key, Context c) {
-          return keyedCombineFn.createAccumulator(key);
-        }
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
-          return keyedCombineFn.addInput(key, accumulator, value);
-        }
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-          return keyedCombineFn.mergeAccumulators(key, accumulators);
-        }
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.extractOutput(key, accumulator);
-        }
-        @Override
-        public AccumT compact(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.compact(key, accumulator);
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
-        }
-      };
-    }
-  }
-
   private static <OutputT> void checkUniqueness(
       List<TupleTag<?>> registeredTags, TupleTag<OutputT> outputTag) {
     checkArgument(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d9dee3e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
index 097bae3..ed07efa 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -24,6 +23,7 @@ import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
 import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
 import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
 import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
@@ -105,6 +105,55 @@ public class CombineFnUtil {
     }
   }
 
+  /**
+   * Return a {@link KeyedCombineFnWithContext} from the given {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
+  toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      @SuppressWarnings("unchecked")
+      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
+      return keyedCombineFnWithContext;
+    } else {
+      @SuppressWarnings("unchecked")
+      final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
+      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
+        @Override
+        public AccumT createAccumulator(K key, Context c) {
+          return keyedCombineFn.createAccumulator(key);
+        }
+        @Override
+        public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
+          return keyedCombineFn.addInput(key, accumulator, value);
+        }
+        @Override
+        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
+          return keyedCombineFn.mergeAccumulators(key, accumulators);
+        }
+        @Override
+        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
+          return keyedCombineFn.extractOutput(key, accumulator);
+        }
+        @Override
+        public AccumT compact(K key, AccumT accumulator, Context c) {
+          return keyedCombineFn.compact(key, accumulator);
+        }
+        @Override
+        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
+            Coder<InputT> inputCoder) throws CannotProvideCoderException {
+          return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
+        }
+        @Override
+        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
+            Coder<InputT> inputCoder) throws CannotProvideCoderException {
+          return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
+        }
+      };
+    }
+  }
+
   private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
       extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
     private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d9dee3e/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/CombineFnUtilTest.java
index 40b8900..173c1fa 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/CombineFnUtilTest.java
@@ -17,11 +17,17 @@
  */
 package com.google.cloud.dataflow.sdk.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.withSettings;
 
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
 import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
 import com.google.cloud.dataflow.sdk.util.state.StateContexts;
+import com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -33,6 +39,7 @@ import org.junit.runners.JUnit4;
 import java.io.ByteArrayOutputStream;
 import java.io.NotSerializableException;
 import java.io.ObjectOutputStream;
+import java.util.List;
 
 /**
  * Unit tests for {@link CombineFnUtil}.
@@ -61,4 +68,37 @@ public class CombineFnUtilTest {
     ObjectOutputStream oos = new ObjectOutputStream(buffer);
     oos.writeObject(CombineFnUtil.bindContext(mockCombineFn, StateContexts.nullContext()));
   }
+
+  @Test
+  public void testToFnWithContextIdempotent() throws Exception {
+    CombineFnWithContext<Integer, int[], Integer> fnWithContext =
+        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+    assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext));
+
+    KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext =
+        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().asKeyedFn());
+    assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext));
+  }
+
+  @Test
+  public void testToFnWithContext() throws Exception {
+    CombineFnWithContext<Integer, int[], Integer> fnWithContext =
+        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+    List<Integer> inputs = ImmutableList.of(1, 2, 3, 4);
+    Context nullContext = CombineContextFactory.nullContext();
+    int[] accum = fnWithContext.createAccumulator(nullContext);
+    for (Integer i : inputs) {
+      accum = fnWithContext.addInput(accum, i, nullContext);
+    }
+    assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue());
+
+    KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext =
+        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().<String>asKeyedFn());
+    String key = "key";
+    accum = keyedFnWithContext.createAccumulator(key, nullContext);
+    for (Integer i : inputs) {
+      accum = keyedFnWithContext.addInput(key, accum, i, nullContext);
+    }
+    assertEquals(10, keyedFnWithContext.extractOutput(key, accum, nullContext).intValue());
+  }
 }


[2/2] incubator-beam git commit: This closes #58

Posted by bc...@apache.org.
This closes #58


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15a8334f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15a8334f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15a8334f

Branch: refs/heads/master
Commit: 15a8334f90a43399bacf91b3eca6edd982229f20
Parents: e3f2d95 0d9dee3
Author: bchambers <bc...@google.com>
Authored: Wed Apr 6 12:54:11 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Apr 6 12:54:11 2016 -0700

----------------------------------------------------------------------
 .../dataflow/sdk/transforms/CombineFns.java     | 102 +------------------
 .../cloud/dataflow/sdk/util/CombineFnUtil.java  |  51 +++++++++-
 .../dataflow/sdk/util/CombineFnUtilTest.java    |  40 ++++++++
 3 files changed, 95 insertions(+), 98 deletions(-)
----------------------------------------------------------------------