You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "kennknowles (via GitHub)" <gi...@apache.org> on 2024/03/06 17:00:45 UTC

[PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

kennknowles opened a new pull request, #30545:
URL: https://github.com/apache/beam/pull/30545

   This initial PR adds Redistribute transform(s) according to:
   
    - [Design doc for Redistribute](https://s.apache.org/beam-redistribute) ([dev@ thread](https://lists.apache.org/thread/rc4tcq7k0950cdgw9lyd0s7vkz8rz5s9))
    - [Design doc for Redistribute allowing duplicates](https://s.apache.org/beam-redistribute-allowing-duplicates) ([dev@ thread](https://lists.apache.org/thread/y7zzfc21804lwnwslbhvg6gq7scw2dso))
   
   This should make the Java API concrete. The Dataflow translation will likely also need revision, and I will need to add portable translation.
   
   *Note that the commits in the PR are independent and should not be squashed*
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1521012592


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   There seems to be no test for `Redistribute.byKey`. What is the invariant there? I suppose that (as far as only the test is concerned), we could do `Redistribute.byKey` -> `ParDo` and verify elements with identical keys are processed not processed in parallel?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java:
##########
@@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate(
     return context;
   }
 
+  private static <K, V> void translateRedistributeByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());

Review Comment:
   I wonder if it is OK to translate `REDISTRIBUTE_BY_KEY` and `REDISTRIBUTE_ARBITRARILY` the same way. Do the semantics actually differ? If yes, maybe semantically correct would be [DataSet#groupBy](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-) -> [DataSet#reduceGroup(identity)](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/operators/UnsortedGrouping.html#reduceGroup-org.apache.flink.api.common.functions.GroupReduceFunction-)?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -921,6 +926,38 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorStreaming<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
+
+    @Override
+    public void translateNode(
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());

Review Comment:
   `keyBy`?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -283,6 +287,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   This might want to use `DataStream#keyBy`



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.
+      // The output type coder may be relying on file system access. The shuffled data may have to
+      // be deserialized on a different machine using this coder where FileSystems has not been
+      // initialized.
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      final Configuration partitionOptions = new Configuration();
+      partitionOptions.setString(
+          Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);

Review Comment:
   @dmvk Does this have the required effect? I'd think this applies to SQL optimizer, mostly. No?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1522946352


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   If there are no semantic guarantees regarding how the key will be used then it's probably fine. Personally I'd prefer to implement `.arbitrarily()` using `keyBy()` with key as hash of value, but the fact that keys are co-located in partitions cannot be (currently) leveraged, so it probably does not matter.
   
   FYI, I (strangely completely independently) found a bug in Flink's current batch reshuffle: https://github.com/apache/beam/pull/30622.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1998810708

   The new translation for `Redistribute.arbitrarily()` in the portable batch Flink runner caused a problem. I think there may be a bug in how composites are implemented because it still had transforms that consumed PCollections that had been eliminated. It is feature creep anyhow. Just translating `Redistribute.byKey()` brings it to parity. So I removed the special translation in the portable runner. Since it passed tests in the non-portable runner, I left it there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1521279495


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java:
##########
@@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate(
     return context;
   }
 
+  private static <K, V> void translateRedistributeByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());

Review Comment:
   I think for FlinkRunner today, all of Reshuffle, RedistributeArbitrarily, and RedistributeByKey are basically the same semantics. I do think that is a coincidence rather than a logical necessity, which is why I gave them each their own method.
   
   (these are basically just copy/paste of Reshuffle translation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2008089051

   OK this is now green. The test suites that are still running were green before and have not been touched. Please review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1981361829

   Adding R from my collaborators and people who spoke up on dev threads.
   
   R: @Naireen 
   R: @lostluck
   R: @je-ik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531638179


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -921,6 +926,38 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorStreaming<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
+
+    @Override
+    public void translateNode(
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());

Review Comment:
   Agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531106246


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   @robertwb you might be the person to review DataflowRunner translation? For Reshuffle we don't have a translator but a more complex rewrites to a specialized GroupByKey. I opted to _not_ do that this time but translate more directly. I added ValidatesRunner tests for Redistribute that check parity with Reshuffle, at least in terms of that test suite.
   
   CC @scwhittle 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1989279290

   OK this is ready for review. I don't know if you all are interested in reviewing it or I will also CC @robertwb. I think the separate commits might be easier to review separately (I'm happy to put them in their own PR but also just focusing a review on a particular code root is equivalent)
   
   1. model proto
   2. Java SDK impl with translation to proto
   3. Each runner - I did Dataflow first but then Spark had test failures so I guess basic GBK fails to preserve metadata so I just went ahead and added translations to Spark, Flink, Samza too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1522946352


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   If there are no semantic guarantees regarding how the key will be used then it's probably fine. Personally I'd prefer to implement `.arbitrarily()` using `keyBy()` with key as hash of value, but the fact that keys are co-located in partitions cannot be (currently) leveraged, so it probably does not matter.
   
   FYI, I (curiously completely independently) found a bug in Flink's current batch reshuffle: https://github.com/apache/beam/pull/30622.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2086869589

   Ah, I neglected the PVR suites.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1525097304


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java:
##########
@@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate(
     return context;
   }
 
+  private static <K, V> void translateRedistributeByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());

Review Comment:
   Tried a rewrite. still testing it but I pushed the change up.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.
+      // The output type coder may be relying on file system access. The shuffled data may have to
+      // be deserialized on a different machine using this coder where FileSystems has not been
+      // initialized.
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      final Configuration partitionOptions = new Configuration();
+      partitionOptions.setString(
+          Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);

Review Comment:
   Adjusted to match your Reshuffle implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1524444560


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   I got more information about the original change from `.rebalance()` to the hint. The problem was actually a bug in Flink: https://lists.apache.org/thread/tsbyzlk8p7m59qbgtgfm4rl0cx1rnz1j
   
   I will try to get more info if this has been resolved and if we can use simple `rebalance()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531642528


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java:
##########
@@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate(
     return context;
   }
 
+  private static <K, V> void translateRedistributeByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());

Review Comment:
   This can be resolved later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1583039482


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   Yes, that's right. (and Reshuffle doesn't have a translator because it just has an override with special windowing strategy as a flag value on GBK to tell v1 to reshuffle)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1523799085


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   Yea I agree with what you are saying. Some runners may have even better ways to redistribute arbitrarily (such as using dynamic runtime information to rebalance as it executes). But for Flink now I totally agree with your change, and I will try to mimic or leverage it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1989280287

   Redistribute itself and the ValidatesRunner tests are lifted straight from Reshuffle so equivalent levels of assurance FWIW.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2082643119

   Yea, I actually do feel strongly about the one remaining issue. I get what you are saying about directly re-using the reshuffle translator as a way of expressing "the reshuffle translation is good enough for this use case", and this is a point where I diverge with many programmers.
   
   One helpful way I think about from the perspective of both Reshuffle and Redistribute translations being perpetual works-in-progress. If someone goes in and messes with the function "reshuffleTranslator" in either Flink or Dataflow, I don't expect them to be aware of all the places it is used. They should edit with confidence that the blast radius is just Reshuffle. So having separate-but-coincidentally-currently-identical pieces of code achieves that. Contrast that with the case where we intend one to always evolve with the other, due to logical necessity. In that case, we would also want them to only be thinking about Reshuffle, but anything implemented via Reshuffle would get the changes, because we want them to. So in this particular case, I want them to be as independent as possible. In fact, I want to actually re-deprecate Reshuffle as soon as we get RequiresStableInput implemented fully ;-) and I also want to drive all at-least-once usage to Redistribute.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2086781686

   Looks like several VR PostCommits failing after this PR:
   
   https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml?query=event%3Aschedule
   
   https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule
   
   https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Aschedule
   
   https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Spark3_Streaming.yml?query=event%3Aschedule
   
   https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Spark_Batch.yml?query=event%3Aschedule
   
   
   
   example error
   
   https://github.com/apache/beam/actions/runs/8881963750/job/24385538170
   
   ```
   > Task :runners:spark:3:job-server:validatesPortableRunnerStreaming
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributeAfterSessionsAndGroupByKey FAILED
       java.lang.RuntimeException at RedistributeTest.java:265
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributeAfterFixedWindowsAndGroupByKey FAILED
       java.lang.RuntimeException at RedistributeTest.java:286
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributePreservesMetadata FAILED
       java.lang.RuntimeException at RedistributeTest.java:244
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributeAfterSlidingWindowsAndGroupByKey FAILED
       java.lang.RuntimeException at RedistributeTest.java:307
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributeAfterFixedWindows FAILED
       java.lang.RuntimeException at RedistributeTest.java:327
   
   org.apache.beam.sdk.transforms.RedistributeTest > testRedistributeAfterSlidingWindows FAILED
       java.lang.RuntimeException at RedistributeTest.java:347
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2009846203

   Seems that streaming wordcount is timing out at 900 seconds pretty reliably :-/
   
   I'll keep kicking that test perhaps. Presumably it has about 1 second of compute and 15 minutes of turn up turn down though. Oof.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1981363980

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1521397951


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   Ah yes, I translated all tests of `Reshuffle.of()` to `Redistribute.arbitrariliy()`. FWIW I think letting the runner control the redistribute is definitely the best technical direction so that is the primary use case. But I change one back so we have some explicit coverage. I could add more.
   
   The invariant for both transforms is simply that the output equals the input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1524979764


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   After offline discussion with @dmvk I think we can try to return back to the hint, but use the `_HASH` variant. According to the discussion this should be respected and have the required semantics (i.e. `Reshuffle` + `Redistribute.arbitrarily()`). I will try it next week.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531103041


##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -813,6 +813,10 @@ message GroupIntoBatchesPayload {
   int64 max_buffering_duration_millis = 2;
 }
 
+message RedistributePayload {

Review Comment:
   @robertwb pinging you on this very thrilling addition to the protos



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1541202528


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.

Review Comment:
   I couldn't find anything. Guess I'll dive into the history of the line this is copied from and see if there was something clear. Might be obsolete too.



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   I don't understand the question? Does this help?
   
    - `Redistribute.arbitrarily()` has the same composite structure as `Reshuffle.viaRandomKey()`
    - `Redistribute.byKey()` has the same composite structure as `Reshuffle.of()`
    - This code here is the Dataflow v1 translation for `Redistribute.byKey()` which is simplified from the translation of `Reshuffle.of()`
    - The proposal for the future is to do even better for the `arbitrarily` case by having Dataflow expose a primitive rather than the existing cludge on top of GroupByKey.
   
   This thread didn't mention it but now I realize there's a potential problem because the purpose of the override was to save some data shuffled by not reifying the timestamps since they are available elsewhere in Dataflow-specific shuffle metadata. Now I'm on the fence, because I'd rather not rely on that always being the case, as it would be update-incompatible to change it, whereas reifying all metadata in a standard way is robust to changes.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Yea, we could share the same implementation. I followed my rule that code should be shared if it is representing the same thing by logical necessity, otherwise not shared. In this case there are two very similar things that are temporarily having the same implementation. I don't care too much, could re-use the same lines of code for now until we choose to diverge. I have a slight preference for keeping them separate to make it obvious that there is no logical necessity that they be in sync.



##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
 
         // Flink reshuffle override does not preserve all metadata
         excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+        // Flink redistribute override does not preserve all metadata

Review Comment:
   Yea, I don't know what all is needed. I'll take another look and see if it is obvious.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
 public class ReshuffleTranslator<K, InT, OutT>
     implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>>> {
 
+  private final String prefix;
+
+  ReshuffleTranslator(String prefix) {
+    this.prefix = prefix;
+  }
+
+  ReshuffleTranslator() {
+    this("rhfl-");

Review Comment:
   It was not! Thank you



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+  /** @return a {@link RedistributeArbitrarily} transform with default configuration. */
+  public static <T> RedistributeArbitrarily<T> arbitrarily() {
+    return new RedistributeArbitrarily<>(null, false);
+  }
+
+  /** @return a {@link RedistributeByKey} transform with default configuration. */
+  public static <K, V> RedistributeByKey<K, V> byKey() {
+    return new RedistributeByKey<>(false);
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    private final boolean allowDuplicates;
+
+    private RedistributeByKey(boolean allowDuplicates) {
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeByKey<K, V> withAllowDuplicates(boolean newAllowDuplicates) {
+      return new RedistributeByKey<>(newAllowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      // If the input has already had its windows merged, then the GBK that performed the merge
+      // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
+      // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+      // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+      // time.
+      // Because this outputs as fast as possible, this should not hold the watermark.
+      Window<KV<K, V>> rewindow =
+          Window.<KV<K, V>>into(
+                  new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+              .triggering(new ReshuffleTrigger<>())
+              .discardingFiredPanes()
+              .withTimestampCombiner(TimestampCombiner.EARLIEST)
+              .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+      PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+          input
+              .apply("SetIdentityWindow", rewindow)
+              .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+      PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+          reified.apply(GroupByKey.create());
+      return grouped
+          .apply(
+              "ExpandIterable",
+              ParDo.of(
+                  new DoFn<
+                      KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+                      K key = element.getKey();
+                      for (ValueInSingleWindow<V> value : element.getValue()) {
+                        r.output(KV.of(key, value));
+                      }
+                    }
+                  }))
+          .apply("RestoreMetadata", new RestoreMetadata<>())
+          // Set the windowing strategy directly, so that it doesn't get counted as the user having
+          // set allowed lateness.
+          .setWindowingStrategyInternal(originalStrategy);
+    }
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKeyAllowingDuplicates<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Redistribute.byKey());
+    }
+  }
+
+  /**
+   * Noop transform that hints to the runner to try to redistribute the work evenly, or via whatever
+   * clever strategy the runner comes up with.
+   */
+  public static class RedistributeArbitrarily<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // The number of buckets to shard into.
+    // A runner is free to ignore this (a runner may ignore the transorm
+    // entirely!) This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets = null;
+    private boolean allowDuplicates = false;
+
+    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean allowDuplicates) {
+      this.numBuckets = numBuckets;
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+    }
+
+    public RedistributeArbitrarily<T> withAllowDuplicates(boolean allowDuplicates) {
+      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
+          .apply(Redistribute.<Integer, T>byKey().withAllowDuplicates(this.allowDuplicates))
+          .apply(Values.create());
+    }
+  }
+
+  private static class RestoreMetadata<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);
+                }
+
+                @ProcessElement
+                public void processElement(
+                    @Element KV<K, ValueInSingleWindow<V>> kv, OutputReceiver<KV<K, V>> r) {
+                  r.outputWindowedValue(
+                      KV.of(kv.getKey(), kv.getValue().getValue()),
+                      kv.getValue().getTimestamp(),
+                      Collections.singleton(kv.getValue().getWindow()),
+                      kv.getValue().getPane());
+                }
+              }));
+    }
+  }
+
+  public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int shard;
+    private @Nullable Integer numBuckets;
+
+    public AssignShardFn(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    @Setup
+    public void setup() {
+      shard = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, OutputReceiver<KV<Integer, T>> r) {
+      ++shard;
+      // Smear the shard into something more random-looking, to avoid issues
+      // with runners that don't properly hash the key being shuffled, but rely
+      // on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
+      // which for Integer is a no-op and it is an issue:
+      // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+      // spark.html
+      // This hashing strategy is copied from
+      // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+      int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
+      if (numBuckets != null) {
+        UnsignedInteger unsignedNumBuckets = UnsignedInteger.fromIntBits(numBuckets);

Review Comment:
   I don't know! Better consult the history of `Reshuffle.java`...



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+    implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>>> {
+
+  private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+      new ReshuffleTranslator<>("rdstr-");
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+    reshuffleTranslator.translate(transform, node, ctx);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+  }
+
+  /** Predicate to determine whether a URN is a Samza native transform. */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsSamzaNativeTransform implements NativeTransforms.IsNativeTransform {
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      return false;
+      // Re-enable after https://github.com/apache/beam/issues/21188 is completed

Review Comment:
   Haha I just copy/pasted the comment too. TBH I don't have context here and I'm going to leave the behavior but alter the comment...



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
   public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
   public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String REDISTRIBUTE_BY_KEY_URN = "beam:transform:redistribute_by_key:v1";

Review Comment:
   Yes. I think all of them should migrate over there as well. I've added the new ones there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2079227623

   BTW the test failures are an issue with how GHA is selecting a JVM it look like. We have precommit and postcommits passing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2007801662

   The problem in the portable runner was that it was not registered as a "native" transform. I don't know the details, but this influences the fuser.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1523440543


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.
+      // The output type coder may be relying on file system access. The shuffled data may have to
+      // be deserialized on a different machine using this coder where FileSystems has not been
+      // initialized.
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      final Configuration partitionOptions = new Configuration();
+      partitionOptions.setString(
+          Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);

Review Comment:
   Just re-iterating that this is a copy-paste-modify of the Reshuffle logic. I didn't have them just share logic because I only like to do that when it is by logical necessity whereas these two transforms may at some point diverge. (but it would be fine to make them share for now)



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -283,6 +287,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Would `keyBy` cause the `rebalance()` to use those keys when rebalancing? The current `Reshuffle` doesn't seem to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1532980254


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   Both transforms are a hint. They are just different hints. We could run the equivalent tests for both, to make sure the runner doesn't mess one up when they directly implement. That is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-1981548748

   One question which wasn't in the design docs was how to implement: wrap Reshuffle (aka build a composite that just invokes Reshuffle and relies on everything built around it) or fork. This PR chose to fork. Pro/con:
   
   Arguments for wrapping:
    - less code
    - runners that implement Reshuffle specially already will implement Redistribute the same way
    - if there is something I missed in how Reshuffle is treated, it will get picked up because we are still using it
   
   Arguments for forking:
    - decouple whatever state a runner may store, and just generally decouple their evolution
    - people won't unpack their "Redistribute" and see a Reshuffle inside and get the wrong idea
    - (minor) can remove update compatibility path
    - if there is something I missed in how Redistribute and Reshuffle are different, we are free for them to diverge
   
   So I chose forking but could be convinced otherwise. My way creates more code and more work, because we need to make runners treat it specially - for Dataflow we can do way better than the existing Reshuffle translation, for the other runners it'll be a re-use of existing Reshuffle translation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1581366814


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   I guess I was thinking in terms of portable runners. Here we have to have a special translator to set the `PropertyNames.ALLOW_DUPLICATES` property, right?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -788,6 +852,8 @@ public String toNativeString() {
     EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, createPCollView());
     EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
     EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+    EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, redistributeArbitrarily());

Review Comment:
   Same comment as for Flink (and below).



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Looks identical to the code above as well. I suppose my preference is to not duplicate code unless there's a god reason for divergence or it's difficult to do so. E.g. above why not just write
   
   ```
   translatorMap.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, this::translateReshuffle);
   translatorMap.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, this::translateReshuffle);
   ```
   
   which makes it clear that these are all implemented the same rather than trying to pattern match and see what the differences, if any, are.
   
   It's not logical necessity, but saying "this (same) implementation is good enough for this usecase as well."



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   But if you feel strongly, no need for this to be a blocker.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -162,6 +162,11 @@ class FlinkStreamingTransformTranslators {
         new CreateViewStreamingTranslator());
 
     TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorStreaming());
+    TRANSLATORS.put(

Review Comment:
   Same exact comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2075672680

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/30545?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 68.56%. Comparing base [(`37609ba`)](https://app.codecov.io/gh/apache/beam/commit/37609ba70fab2216edc338121bf2f3a056a1e490?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`bdfc781`)](https://app.codecov.io/gh/apache/beam/pull/30545?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 11 commits behind head on master.
   
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #30545      +/-   ##
   ============================================
   + Coverage     68.51%   68.56%   +0.04%     
   - Complexity     2980     2981       +1     
   ============================================
     Files           352      352              
     Lines         27857    27857              
     Branches       3231     3231              
   ============================================
   + Hits          19087    19099      +12     
   + Misses         7305     7295      -10     
   + Partials       1465     1463       -2     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/beam/pull/30545/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [java](https://app.codecov.io/gh/apache/beam/pull/30545/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `68.56% <ø> (+0.04%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/30545?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1522088872


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   `Redistribute.byKey()` lets the user include keys as a hint for the runner. In both cases, the transform should have output equal to input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1523446942


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.
+      // The output type coder may be relying on file system access. The shuffled data may have to
+      // be deserialized on a different machine using this coder where FileSystems has not been
+      // initialized.
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      final Configuration partitionOptions = new Configuration();
+      partitionOptions.setString(
+          Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);

Review Comment:
   Yes, this is actually bug in the current implementation. https://github.com/apache/beam/pull/30622



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to Java SDK and Dataflow translation [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1521435648


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   I would suppose, that `Redistribute.byKey()` has additional semantics, or what would be the difference from `.arbitrarily()`? From my understanding the post-redistribute PCollection has data with same key co-located on the same partitions (workers). Is that not the case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531103924


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -921,6 +926,38 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorStreaming<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
+
+    @Override
+    public void translateNode(
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());

Review Comment:
   How about we do Reshuffle and Redistribute together in a follow-up PR. This one touches so much that it is a bit sensitive to being prodded. Currently it is at parity on Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531641777


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   Thanks! I still wonder if we can cover the difference between `Redistribute.byKey()` and `.arbitrarily()` in tests? Or is this completely impossible? If it is impossible, why would one use the `.byKey()` variant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2009853881

   Hmm actually seeing
   
   ```
   java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/github/benmanes/caffeine/cache/RemovalCause
   2024-03-20 10:54:38.890 EDT
   	at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache.create(ChannelCache.java:55)
   2024-03-20 10:54:38.891 EDT
   	at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer.create(GrpcWindmillServer.java:164)
   2024-03-20 10:54:38.891 EDT
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.createWindmillServerStub(StreamingDataflowWorker.java:625)
   2024-03-20 10:54:38.891 EDT
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.fromOptions(StreamingDataflowWorker.java:466)
   2024-03-20 10:54:38.891 EDT
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.main(StreamingDataflowWorker.java:577)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1583040940


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Made a long post. I very much view their current "sameness" as coincidence that can diverge. I want to freeze Reshuffle where it is and Redistribute be the place that evolves to be different. (for example incorporating more of Jan's ideas for Flink, and using new Dataflow features for Dataflow)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles merged PR #30545:
URL: https://github.com/apache/beam/pull/30545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2087706789

   I am opening a revert in a minute but taking a look also.
   
    - Samza PVR is existing issue with Reshuffle so sickbay is parity
    - ULR seems to have some completely wild incorrect implementation (the Redistribute is a valid composite that ULR doesn't do anything special)
    - Flink PVR is a problem actually I fixed during the iteration on the pull request but reverted according to reviewer comments and I guess when I rebased the trigger file no longer had a delta so it seemed green
    - Spark and Spark 3 PVR is parity and needs to be sickbayed
   
   I think I want to roll forward but without the runner translations. These are valid composites. When I first pushed it up, though, most runners failed. So there may be an issue with the composite definition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #30545:
URL: https://github.com/apache/beam/pull/30545#issuecomment-2010761636

   Rebased passed the failure at HEAD just to get a fully green signal on this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1532287597


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   I do want to do a custom translation, yea. I think the best way to do it is probably a black-box Dataflow primitive rather than client side. Filed b/330519228 internally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1523448238


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -283,6 +287,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Let's keep the streaming implementation as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1525422965


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RedistributeTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
+import static org.apache.beam.sdk.util.construction.PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Redistribute.AssignShardFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Redistribute}. */
+@RunWith(JUnit4.class)
+public class RedistributeTest implements Serializable {

Review Comment:
   For this PR I hit some snags in the portable runner in every case so I just want to revert to copying reshuffle and we can fix it later. I updated it to match your new reshuffle implementation, but only the one that you modified. The rest are just what they already were.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1531976044


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   Should we register a RedistributeArbitrary translator as well?  At least for streaming the entirely distinct bundles with entirely unique keys is rarely a good choice.  Or a TODO if out of scope for this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1538190898


##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
 
         // Flink reshuffle override does not preserve all metadata
         excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+        // Flink redistribute override does not preserve all metadata

Review Comment:
   This seems bad. Is it not possible to fix this (e.g. by reifying the metadata if needed)? I see that reshuffle didn't, but ideally the new, improved transform should do things the right way. 



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
+    RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());

Review Comment:
   Where is the key being used here? (This implementation looks identical to the one below.)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+  /** @return a {@link RedistributeArbitrarily} transform with default configuration. */
+  public static <T> RedistributeArbitrarily<T> arbitrarily() {
+    return new RedistributeArbitrarily<>(null, false);
+  }
+
+  /** @return a {@link RedistributeByKey} transform with default configuration. */
+  public static <K, V> RedistributeByKey<K, V> byKey() {
+    return new RedistributeByKey<>(false);
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    private final boolean allowDuplicates;
+
+    private RedistributeByKey(boolean allowDuplicates) {
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeByKey<K, V> withAllowDuplicates(boolean newAllowDuplicates) {
+      return new RedistributeByKey<>(newAllowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      // If the input has already had its windows merged, then the GBK that performed the merge
+      // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
+      // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+      // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+      // time.
+      // Because this outputs as fast as possible, this should not hold the watermark.
+      Window<KV<K, V>> rewindow =
+          Window.<KV<K, V>>into(
+                  new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+              .triggering(new ReshuffleTrigger<>())
+              .discardingFiredPanes()
+              .withTimestampCombiner(TimestampCombiner.EARLIEST)
+              .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+      PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+          input
+              .apply("SetIdentityWindow", rewindow)
+              .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+      PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+          reified.apply(GroupByKey.create());
+      return grouped
+          .apply(
+              "ExpandIterable",
+              ParDo.of(
+                  new DoFn<
+                      KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+                      K key = element.getKey();
+                      for (ValueInSingleWindow<V> value : element.getValue()) {
+                        r.output(KV.of(key, value));
+                      }
+                    }
+                  }))
+          .apply("RestoreMetadata", new RestoreMetadata<>())
+          // Set the windowing strategy directly, so that it doesn't get counted as the user having
+          // set allowed lateness.
+          .setWindowingStrategyInternal(originalStrategy);
+    }
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKeyAllowingDuplicates<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Redistribute.byKey());
+    }
+  }
+
+  /**
+   * Noop transform that hints to the runner to try to redistribute the work evenly, or via whatever
+   * clever strategy the runner comes up with.
+   */
+  public static class RedistributeArbitrarily<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // The number of buckets to shard into.
+    // A runner is free to ignore this (a runner may ignore the transorm
+    // entirely!) This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets = null;
+    private boolean allowDuplicates = false;
+
+    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean allowDuplicates) {
+      this.numBuckets = numBuckets;
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+    }
+
+    public RedistributeArbitrarily<T> withAllowDuplicates(boolean allowDuplicates) {
+      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
+          .apply(Redistribute.<Integer, T>byKey().withAllowDuplicates(this.allowDuplicates))
+          .apply(Values.create());
+    }
+  }
+
+  private static class RestoreMetadata<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);
+                }
+
+                @ProcessElement
+                public void processElement(
+                    @Element KV<K, ValueInSingleWindow<V>> kv, OutputReceiver<KV<K, V>> r) {
+                  r.outputWindowedValue(
+                      KV.of(kv.getKey(), kv.getValue().getValue()),
+                      kv.getValue().getTimestamp(),
+                      Collections.singleton(kv.getValue().getWindow()),
+                      kv.getValue().getPane());
+                }
+              }));
+    }
+  }
+
+  public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int shard;
+    private @Nullable Integer numBuckets;
+
+    public AssignShardFn(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    @Setup
+    public void setup() {
+      shard = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, OutputReceiver<KV<Integer, T>> r) {
+      ++shard;
+      // Smear the shard into something more random-looking, to avoid issues
+      // with runners that don't properly hash the key being shuffled, but rely
+      // on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
+      // which for Integer is a no-op and it is an issue:
+      // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+      // spark.html
+      // This hashing strategy is copied from
+      // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+      int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
+      if (numBuckets != null) {
+        UnsignedInteger unsignedNumBuckets = UnsignedInteger.fromIntBits(numBuckets);

Review Comment:
   Why not just do something like Math.abs(hashOfShard) % numBuckets?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
 public class ReshuffleTranslator<K, InT, OutT>
     implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>>> {
 
+  private final String prefix;
+
+  ReshuffleTranslator(String prefix) {
+    this.prefix = prefix;
+  }
+
+  ReshuffleTranslator() {
+    this("rhfl-");

Review Comment:
   Was the dropping of the s in "rshfl" intentional?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.

Review Comment:
   Is there an upstream bug or other reference about why this hack is needed?



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   Is there a reason to not simply retain the existing composite (reshuffle) implementation (which can then be detected and overwritten by any runner?)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
   public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
   public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String REDISTRIBUTE_BY_KEY_URN = "beam:transform:redistribute_by_key:v1";

Review Comment:
   Should these get added as constants in the proto files?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -813,6 +813,10 @@ message GroupIntoBatchesPayload {
   int64 max_buffering_duration_millis = 2;
 }
 
+message RedistributePayload {

Review Comment:
   Looks fine to me.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+    implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>>> {
+
+  private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+      new ReshuffleTranslator<>("rdstr-");
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+    reshuffleTranslator.translate(transform, node, ctx);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+  }
+
+  /** Predicate to determine whether a URN is a Samza native transform. */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsSamzaNativeTransform implements NativeTransforms.IsNativeTransform {
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      return false;
+      // Re-enable after https://github.com/apache/beam/issues/21188 is completed

Review Comment:
   Isn't this bug what this PR is attempting to address?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1578127727


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation.

Review Comment:
   TBH I'd rather separate this investigation to get the Redistribute transform working.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
   public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
   public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String REDISTRIBUTE_BY_KEY_URN = "beam:transform:redistribute_by_key:v1";

Review Comment:
   I'll do a separate change to perhaps migrate these constants to use the constants in the proto file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add Redistribute transform to model, Java SDK, and most active runners [beam]

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1578071267


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
 public class ReshuffleTranslator<K, InT, OutT>
     implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>>> {
 
+  private final String prefix;
+
+  ReshuffleTranslator(String prefix) {
+    this.prefix = prefix;
+  }
+
+  ReshuffleTranslator() {
+    this("rhfl-");

Review Comment:
   fixed



##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
 
         // Flink reshuffle override does not preserve all metadata
         excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+        // Flink redistribute override does not preserve all metadata

Review Comment:
   OK I re-added and it succeeded. I think I added this during debugging. The non-portable runner works, if I have invoked the tests right. I'll watch the CI results.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+    implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>>> {
+
+  private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+      new ReshuffleTranslator<>("rdstr-");
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+    reshuffleTranslator.translate(transform, node, ctx);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+  }
+
+  /** Predicate to determine whether a URN is a Samza native transform. */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsSamzaNativeTransform implements NativeTransforms.IsNativeTransform {
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      return false;
+      // Re-enable after https://github.com/apache/beam/issues/21188 is completed

Review Comment:
   Just removed this comment. It is simply not a native transform, which is a hack someone added to support having transforms that are not a Beam primitive.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+  /** @return a {@link RedistributeArbitrarily} transform with default configuration. */
+  public static <T> RedistributeArbitrarily<T> arbitrarily() {
+    return new RedistributeArbitrarily<>(null, false);
+  }
+
+  /** @return a {@link RedistributeByKey} transform with default configuration. */
+  public static <K, V> RedistributeByKey<K, V> byKey() {
+    return new RedistributeByKey<>(false);
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    private final boolean allowDuplicates;
+
+    private RedistributeByKey(boolean allowDuplicates) {
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeByKey<K, V> withAllowDuplicates(boolean newAllowDuplicates) {
+      return new RedistributeByKey<>(newAllowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      // If the input has already had its windows merged, then the GBK that performed the merge
+      // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
+      // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+      // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+      // time.
+      // Because this outputs as fast as possible, this should not hold the watermark.
+      Window<KV<K, V>> rewindow =
+          Window.<KV<K, V>>into(
+                  new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+              .triggering(new ReshuffleTrigger<>())
+              .discardingFiredPanes()
+              .withTimestampCombiner(TimestampCombiner.EARLIEST)
+              .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+      PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+          input
+              .apply("SetIdentityWindow", rewindow)
+              .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+      PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+          reified.apply(GroupByKey.create());
+      return grouped
+          .apply(
+              "ExpandIterable",
+              ParDo.of(
+                  new DoFn<
+                      KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+                      K key = element.getKey();
+                      for (ValueInSingleWindow<V> value : element.getValue()) {
+                        r.output(KV.of(key, value));
+                      }
+                    }
+                  }))
+          .apply("RestoreMetadata", new RestoreMetadata<>())
+          // Set the windowing strategy directly, so that it doesn't get counted as the user having
+          // set allowed lateness.
+          .setWindowingStrategyInternal(originalStrategy);
+    }
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKeyAllowingDuplicates<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Redistribute.byKey());
+    }
+  }
+
+  /**
+   * Noop transform that hints to the runner to try to redistribute the work evenly, or via whatever
+   * clever strategy the runner comes up with.
+   */
+  public static class RedistributeArbitrarily<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // The number of buckets to shard into.
+    // A runner is free to ignore this (a runner may ignore the transorm
+    // entirely!) This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets = null;
+    private boolean allowDuplicates = false;
+
+    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean allowDuplicates) {
+      this.numBuckets = numBuckets;
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+    }
+
+    public RedistributeArbitrarily<T> withAllowDuplicates(boolean allowDuplicates) {
+      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
+          .apply(Redistribute.<Integer, T>byKey().withAllowDuplicates(this.allowDuplicates))
+          .apply(Values.create());
+    }
+  }
+
+  private static class RestoreMetadata<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);
+                }
+
+                @ProcessElement
+                public void processElement(
+                    @Element KV<K, ValueInSingleWindow<V>> kv, OutputReceiver<KV<K, V>> r) {
+                  r.outputWindowedValue(
+                      KV.of(kv.getKey(), kv.getValue().getValue()),
+                      kv.getValue().getTimestamp(),
+                      Collections.singleton(kv.getValue().getWindow()),
+                      kv.getValue().getPane());
+                }
+              }));
+    }
+  }
+
+  public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int shard;
+    private @Nullable Integer numBuckets;
+
+    public AssignShardFn(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    @Setup
+    public void setup() {
+      shard = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, OutputReceiver<KV<Integer, T>> r) {
+      ++shard;
+      // Smear the shard into something more random-looking, to avoid issues
+      // with runners that don't properly hash the key being shuffled, but rely
+      // on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
+      // which for Integer is a no-op and it is an issue:
+      // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+      // spark.html
+      // This hashing strategy is copied from
+      // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+      int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
+      if (numBuckets != null) {
+        UnsignedInteger unsignedNumBuckets = UnsignedInteger.fromIntBits(numBuckets);

Review Comment:
   In other words I'm just trying to not rock the boat. I think this is update compatible in the sense that if we change the shard assignment for redistribute arbitrarily we are fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org