You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:16 UTC

[beam] 23/24: Apply spotless and checkstyle and add javadocs

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b87912300c863af606933a03ca790e8e94ed05f1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Sep 5 15:35:34 2019 +0200

    Apply spotless and checkstyle and add javadocs
---
 .../translation/helpers/EncoderHelpers.java        | 137 +++++++++++++--------
 .../structuredstreaming/utils/EncodersTest.java    |  32 +++--
 2 files changed, 113 insertions(+), 56 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index c9ab435..f990121 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -89,21 +89,31 @@ public class EncoderHelpers {
    --------- Bridges from Beam Coders to Spark Encoders
   */
 
-  /** A way to construct encoders using generic serializers. */
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder/*, Class<T> claz*/){
+  /**
+   * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code
+   * generation).
+   */
+  public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) {
 
     List<Expression> serialiserList = new ArrayList<>();
     Class<T> claz = (Class<T>) Object.class;
-    serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder));
+    serialiserList.add(
+        new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder));
     ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz);
     return new ExpressionEncoder<>(
         SchemaHelpers.binarySchema(),
         false,
         JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(),
-        new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder),
+        new DecodeUsingBeamCoder<>(
+            new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder),
         classTag);
   }
 
+  /**
+   * Catalyst Expression that serializes elements using Beam {@link Coder}.
+   *
+   * @param <T>: Type of elements ot be serialized.
+   */
   public static class EncodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression {
 
     private Expression child;
@@ -114,13 +124,16 @@ public class EncoderHelpers {
       this.beamCoder = beamCoder;
     }
 
-    @Override public Expression child() {
+    @Override
+    public Expression child() {
       return child;
     }
 
-    @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
+    @Override
+    public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
       // Code to serialize.
-      String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
+      String accessCode =
+          ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
       ExprCode input = child.genCode(ctx);
 
       /*
@@ -140,14 +153,17 @@ public class EncoderHelpers {
       */
       List<String> parts = new ArrayList<>();
       parts.add("byte[] ");
-      parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
+      parts.add(
+          ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
       parts.add(") ");
       parts.add(" = null; else{");
       parts.add(".encode(");
       parts.add(", baos); ");
-      parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+      parts.add(
+          " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
 
-      StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+      StringContext sc =
+          new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
 
       List<Object> args = new ArrayList<>();
 
@@ -157,18 +173,19 @@ public class EncoderHelpers {
       args.add(accessCode);
       args.add(input.value());
       args.add(ev.value());
-      Block code = (new Block.BlockHelper(sc))
-          .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+      Block code =
+          (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
 
-      return ev.copy(input.code().$plus(code), input.isNull(),ev.value());
+      return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
     }
 
-
-    @Override public DataType dataType() {
+    @Override
+    public DataType dataType() {
       return BinaryType;
     }
 
-    @Override public Object productElement(int n) {
+    @Override
+    public Object productElement(int n) {
       switch (n) {
         case 0:
           return child;
@@ -179,15 +196,18 @@ public class EncoderHelpers {
       }
     }
 
-    @Override public int productArity() {
+    @Override
+    public int productArity() {
       return 2;
     }
 
-    @Override public boolean canEqual(Object that) {
+    @Override
+    public boolean canEqual(Object that) {
       return (that instanceof EncodeUsingBeamCoder);
     }
 
-    @Override public boolean equals(Object o) {
+    @Override
+    public boolean equals(Object o) {
       if (this == o) {
         return true;
       }
@@ -198,12 +218,18 @@ public class EncoderHelpers {
       return beamCoder.equals(that.beamCoder) && child.equals(that.child);
     }
 
-    @Override public int hashCode() {
+    @Override
+    public int hashCode() {
       return Objects.hash(super.hashCode(), child, beamCoder);
     }
   }
 
-  public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements  NonSQLExpression{
+  /**
+   * Catalyst Expression that deserializes elements using Beam {@link Coder}.
+   *
+   * @param <T>: Type of elements ot be serialized.
+   */
+  public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression {
 
     private Expression child;
     private ClassTag<T> classTag;
@@ -215,28 +241,31 @@ public class EncoderHelpers {
       this.beamCoder = beamCoder;
     }
 
-    @Override public Expression child() {
+    @Override
+    public Expression child() {
       return child;
     }
 
-    @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
+    @Override
+    public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
       // Code to deserialize.
-      String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
+      String accessCode =
+          ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
       ExprCode input = child.genCode(ctx);
       String javaType = CodeGenerator.javaType(dataType());
 
-/*
-     CODE GENERATED:
-     final $javaType ${ev.value}
-     try {
-      ${ev.value} =
-      ${input.isNull} ?
-      ${CodeGenerator.defaultValue(dataType)} :
-      ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
-     } catch (Exception e) {
-      throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
-     }
-*/
+      /*
+           CODE GENERATED:
+           final $javaType ${ev.value}
+           try {
+            ${ev.value} =
+            ${input.isNull} ?
+            ${CodeGenerator.defaultValue(dataType)} :
+            ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
+           } catch (Exception e) {
+            throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+           }
+      */
 
       List<String> parts = new ArrayList<>();
       parts.add("final ");
@@ -247,9 +276,11 @@ public class EncoderHelpers {
       parts.add(": (");
       parts.add(") ");
       parts.add(".decode(new java.io.ByteArrayInputStream(");
-      parts.add("));  } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+      parts.add(
+          "));  } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
 
-      StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+      StringContext sc =
+          new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
 
       List<Object> args = new ArrayList<>();
       args.add(javaType);
@@ -260,14 +291,14 @@ public class EncoderHelpers {
       args.add(javaType);
       args.add(accessCode);
       args.add(input.value());
-      Block code = (new Block.BlockHelper(sc))
-          .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+      Block code =
+          (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
 
       return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
-
     }
 
-    @Override public Object nullSafeEval(Object input) {
+    @Override
+    public Object nullSafeEval(Object input) {
       try {
         return beamCoder.decode(new ByteArrayInputStream((byte[]) input));
       } catch (Exception e) {
@@ -275,11 +306,13 @@ public class EncoderHelpers {
       }
     }
 
-    @Override public DataType dataType() {
+    @Override
+    public DataType dataType() {
       return new ObjectType(classTag.runtimeClass());
     }
 
-    @Override public Object productElement(int n) {
+    @Override
+    public Object productElement(int n) {
       switch (n) {
         case 0:
           return child;
@@ -292,15 +325,18 @@ public class EncoderHelpers {
       }
     }
 
-    @Override public int productArity() {
+    @Override
+    public int productArity() {
       return 3;
     }
 
-    @Override public boolean canEqual(Object that) {
+    @Override
+    public boolean canEqual(Object that) {
       return (that instanceof DecodeUsingBeamCoder);
     }
 
-    @Override public boolean equals(Object o) {
+    @Override
+    public boolean equals(Object o) {
       if (this == o) {
         return true;
       }
@@ -308,10 +344,13 @@ public class EncoderHelpers {
         return false;
       }
       DecodeUsingBeamCoder<?> that = (DecodeUsingBeamCoder<?>) o;
-      return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder);
+      return child.equals(that.child)
+          && classTag.equals(that.classTag)
+          && beamCoder.equals(that.beamCoder);
     }
 
-    @Override public int hashCode() {
+    @Override
+    public int hashCode() {
       return Objects.hash(super.hashCode(), child, classTag, beamCoder);
     }
   }
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
index c6b8631..8327fd8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.structuredstreaming.utils;
 
 import static org.junit.Assert.assertEquals;
@@ -12,22 +29,23 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+/** Test of the wrapping of Beam Coders as Spark ExpressionEncoders. */
 @RunWith(JUnit4.class)
-/**
- * Test of the wrapping of Beam Coders as Spark ExpressionEncoders.
- */
 public class EncodersTest {
 
   @Test
   public void beamCoderToSparkEncoderTest() {
-    SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest")
-        .master("local[4]").getOrCreate();
+    SparkSession sparkSession =
+        SparkSession.builder()
+            .appName("beamCoderToSparkEncoderTest")
+            .master("local[4]")
+            .getOrCreate();
     List<Integer> data = new ArrayList<>();
     data.add(1);
     data.add(2);
     data.add(3);
-    Dataset<Integer> dataset = sparkSession
-        .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
+    Dataset<Integer> dataset =
+        sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
     List<Integer> results = dataset.collectAsList();
     assertEquals(data, results);
   }