You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2018/12/07 15:10:32 UTC
[beam] branch spark-runner_structured-streaming updated: Add
primitive GroupByKeyTranslatorBatch implementation
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 7b00f7c Add primitive GroupByKeyTranslatorBatch implementation
7b00f7c is described below
commit 7b00f7c6a8ef37e42d741b6954a6e9b87ea8fea7
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Fri Dec 7 10:54:12 2018 +0100
Add primitive GroupByKeyTranslatorBatch implementation
---
...KeyTranslatorBatch.java => EncoderHelpers.java} | 22 ++++------
.../translation/TranslationContext.java | 4 +-
.../batch/GroupByKeyTranslatorBatch.java | 49 ++++++++++++++++++++--
3 files changed, 56 insertions(+), 19 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
similarity index 56%
copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
index 4ee77fb..4c56922 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
@@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+package org.apache.beam.runners.spark.structuredstreaming.translation;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
-class GroupByKeyTranslatorBatch<K, InputT>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
+/** {@link Encoders} utility class. */
+public class EncoderHelpers {
- @Override
- public void translateTransform(
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
- TranslationContext context) {}
+ @SuppressWarnings("unchecked")
+ public static <T> Encoder<T> encoder() {
+ return Encoders.kryo((Class<T>) Object.class);
+ }
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 3c29867..e66bc90 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -46,9 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
*/
public class TranslationContext {
- /** All the datasets of the DAG */
+ /** All the datasets of the DAG. */
private final Map<PValue, Dataset<?>> datasets;
- /** datasets that are not used as input to other datasets (leaves of the DAG) */
+ /** datasets that are not used as input to other datasets (leaves of the DAG). */
private final Set<Dataset<?>> leaves;
private final SparkPipelineOptions options;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 4ee77fb..7f2d7fa 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -17,18 +17,59 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.KeyValueGroupedDataset;
-class GroupByKeyTranslatorBatch<K, InputT>
+class GroupByKeyTranslatorBatch<K, V>
implements TransformTranslator<
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
@Override
public void translateTransform(
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
- TranslationContext context) {}
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform,
+ TranslationContext context) {
+
+ Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput());
+
+ // group by key only.
+ KeyValueGroupedDataset<K, KV<K, V>> grouped =
+ input
+ .map(
+ (MapFunction<WindowedValue<KV<K, V>>, KV<K, V>>) WindowedValue::getValue,
+ EncoderHelpers.encoder())
+ .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.<K>encoder());
+
+ Dataset<KV<K, Iterable<V>>> materialized =
+ grouped.mapGroups(
+ (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>)
+ (key, iterator) -> {
+ // TODO: can we use here just "Iterable<V> iterable = () -> iterator;" ?
+ List<V> values = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ values.add(iterator.next().getValue());
+ }
+ return KV.of(key, Iterables.unmodifiableIterable(values));
+ },
+ EncoderHelpers.encoder());
+
+ Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
+ materialized.map(
+ (MapFunction<KV<K, Iterable<V>>, WindowedValue<KV<K, Iterable<V>>>>)
+ WindowedValue::valueInGlobalWindow,
+ EncoderHelpers.encoder());
+
+ context.putDataset(context.getOutput(), output);
+ }
}