You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/19 18:51:18 UTC

[1/2] incubator-beam git commit: FileBasedSink: remove unused code

Repository: incubator-beam
Updated Branches:
  refs/heads/master 442435ed0 -> f20bf8afd


FileBasedSink: remove unused code


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97a2578e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97a2578e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97a2578e

Branch: refs/heads/master
Commit: 97a2578ee82d38e576e2caa5cdece1e170eac817
Parents: 442435e
Author: Daniel Halperin <da...@halper.in>
Authored: Mon Apr 18 23:12:13 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 19 09:51:03 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 44 --------------------
 1 file changed, 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97a2578e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 391cb96..9297a78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -21,13 +21,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
@@ -35,8 +28,6 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 
 import com.google.api.client.googleapis.batch.BatchRequest;
 import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
@@ -828,39 +819,4 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       flushIfPossible();
     }
   }
-
-  static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      return input
-          // TODO: This would need to be adapted to write per-window shards.
-          .apply(Window.<T>into(new GlobalWindows())
-                       .triggering(DefaultTrigger.of())
-                       .discardingFiredPanes())
-          .apply("RandomKey", ParDo.of(
-              new DoFn<T, KV<Long, T>>() {
-                transient long counter, step;
-                @Override
-                public void startBundle(Context c) {
-                  counter = (long) (Math.random() * Long.MAX_VALUE);
-                  step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
-                }
-                @Override
-                public void processElement(ProcessContext c) {
-                  counter += step;
-                  c.output(KV.of(counter, c.element()));
-                }
-              }))
-          .apply(GroupByKey.<Long, T>create())
-          .apply("Ungroup", ParDo.of(
-              new DoFn<KV<Long, Iterable<T>>, T>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  for (T item : c.element().getValue()) {
-                    c.output(item);
-                  }
-                }
-              }));
-    }
-  }
 }


[2/2] incubator-beam git commit: Closes #208

Posted by dh...@apache.org.
Closes #208


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f20bf8af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f20bf8af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f20bf8af

Branch: refs/heads/master
Commit: f20bf8afd9b756721eba60562cf0c12526c8c13a
Parents: 442435e 97a2578
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 19 09:51:04 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 19 09:51:04 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 44 --------------------
 1 file changed, 44 deletions(-)
----------------------------------------------------------------------