You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:16 UTC
[beam] 23/24: Apply spotless and checkstyle and add javadocs
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b87912300c863af606933a03ca790e8e94ed05f1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Sep 5 15:35:34 2019 +0200
Apply spotless and checkstyle and add javadocs
---
.../translation/helpers/EncoderHelpers.java | 137 +++++++++++++--------
.../structuredstreaming/utils/EncodersTest.java | 32 +++--
2 files changed, 113 insertions(+), 56 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index c9ab435..f990121 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -89,21 +89,31 @@ public class EncoderHelpers {
--------- Bridges from Beam Coders to Spark Encoders
*/
- /** A way to construct encoders using generic serializers. */
- public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder/*, Class<T> claz*/){
+ /**
+ * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code
+ * generation).
+ */
+ public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) {
List<Expression> serialiserList = new ArrayList<>();
Class<T> claz = (Class<T>) Object.class;
- serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder));
+ serialiserList.add(
+ new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder));
ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz);
return new ExpressionEncoder<>(
SchemaHelpers.binarySchema(),
false,
JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(),
- new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder),
+ new DecodeUsingBeamCoder<>(
+ new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder),
classTag);
}
+ /**
+ * Catalyst Expression that serializes elements using Beam {@link Coder}.
+ *
+ * @param <T>: Type of elements ot be serialized.
+ */
public static class EncodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression {
private Expression child;
@@ -114,13 +124,16 @@ public class EncoderHelpers {
this.beamCoder = beamCoder;
}
- @Override public Expression child() {
+ @Override
+ public Expression child() {
return child;
}
- @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
+ @Override
+ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
// Code to serialize.
- String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
+ String accessCode =
+ ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
ExprCode input = child.genCode(ctx);
/*
@@ -140,14 +153,17 @@ public class EncoderHelpers {
*/
List<String> parts = new ArrayList<>();
parts.add("byte[] ");
- parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
+ parts.add(
+ ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
parts.add(") ");
parts.add(" = null; else{");
parts.add(".encode(");
parts.add(", baos); ");
- parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+ parts.add(
+ " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
- StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+ StringContext sc =
+ new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
List<Object> args = new ArrayList<>();
@@ -157,18 +173,19 @@ public class EncoderHelpers {
args.add(accessCode);
args.add(input.value());
args.add(ev.value());
- Block code = (new Block.BlockHelper(sc))
- .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+ Block code =
+ (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
- return ev.copy(input.code().$plus(code), input.isNull(),ev.value());
+ return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
}
-
- @Override public DataType dataType() {
+ @Override
+ public DataType dataType() {
return BinaryType;
}
- @Override public Object productElement(int n) {
+ @Override
+ public Object productElement(int n) {
switch (n) {
case 0:
return child;
@@ -179,15 +196,18 @@ public class EncoderHelpers {
}
}
- @Override public int productArity() {
+ @Override
+ public int productArity() {
return 2;
}
- @Override public boolean canEqual(Object that) {
+ @Override
+ public boolean canEqual(Object that) {
return (that instanceof EncodeUsingBeamCoder);
}
- @Override public boolean equals(Object o) {
+ @Override
+ public boolean equals(Object o) {
if (this == o) {
return true;
}
@@ -198,12 +218,18 @@ public class EncoderHelpers {
return beamCoder.equals(that.beamCoder) && child.equals(that.child);
}
- @Override public int hashCode() {
+ @Override
+ public int hashCode() {
return Objects.hash(super.hashCode(), child, beamCoder);
}
}
- public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression{
+ /**
+ * Catalyst Expression that deserializes elements using Beam {@link Coder}.
+ *
+ * @param <T>: Type of elements ot be serialized.
+ */
+ public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression {
private Expression child;
private ClassTag<T> classTag;
@@ -215,28 +241,31 @@ public class EncoderHelpers {
this.beamCoder = beamCoder;
}
- @Override public Expression child() {
+ @Override
+ public Expression child() {
return child;
}
- @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
+ @Override
+ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
// Code to deserialize.
- String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
+ String accessCode =
+ ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName());
ExprCode input = child.genCode(ctx);
String javaType = CodeGenerator.javaType(dataType());
-/*
- CODE GENERATED:
- final $javaType ${ev.value}
- try {
- ${ev.value} =
- ${input.isNull} ?
- ${CodeGenerator.defaultValue(dataType)} :
- ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
- } catch (Exception e) {
- throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
- }
-*/
+ /*
+ CODE GENERATED:
+ final $javaType ${ev.value}
+ try {
+ ${ev.value} =
+ ${input.isNull} ?
+ ${CodeGenerator.defaultValue(dataType)} :
+ ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
+ } catch (Exception e) {
+ throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+ }
+ */
List<String> parts = new ArrayList<>();
parts.add("final ");
@@ -247,9 +276,11 @@ public class EncoderHelpers {
parts.add(": (");
parts.add(") ");
parts.add(".decode(new java.io.ByteArrayInputStream(");
- parts.add(")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+ parts.add(
+ ")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
- StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+ StringContext sc =
+ new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
List<Object> args = new ArrayList<>();
args.add(javaType);
@@ -260,14 +291,14 @@ public class EncoderHelpers {
args.add(javaType);
args.add(accessCode);
args.add(input.value());
- Block code = (new Block.BlockHelper(sc))
- .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+ Block code =
+ (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
-
}
- @Override public Object nullSafeEval(Object input) {
+ @Override
+ public Object nullSafeEval(Object input) {
try {
return beamCoder.decode(new ByteArrayInputStream((byte[]) input));
} catch (Exception e) {
@@ -275,11 +306,13 @@ public class EncoderHelpers {
}
}
- @Override public DataType dataType() {
+ @Override
+ public DataType dataType() {
return new ObjectType(classTag.runtimeClass());
}
- @Override public Object productElement(int n) {
+ @Override
+ public Object productElement(int n) {
switch (n) {
case 0:
return child;
@@ -292,15 +325,18 @@ public class EncoderHelpers {
}
}
- @Override public int productArity() {
+ @Override
+ public int productArity() {
return 3;
}
- @Override public boolean canEqual(Object that) {
+ @Override
+ public boolean canEqual(Object that) {
return (that instanceof DecodeUsingBeamCoder);
}
- @Override public boolean equals(Object o) {
+ @Override
+ public boolean equals(Object o) {
if (this == o) {
return true;
}
@@ -308,10 +344,13 @@ public class EncoderHelpers {
return false;
}
DecodeUsingBeamCoder<?> that = (DecodeUsingBeamCoder<?>) o;
- return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder);
+ return child.equals(that.child)
+ && classTag.equals(that.classTag)
+ && beamCoder.equals(that.beamCoder);
}
- @Override public int hashCode() {
+ @Override
+ public int hashCode() {
return Objects.hash(super.hashCode(), child, classTag, beamCoder);
}
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
index c6b8631..8327fd8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.runners.spark.structuredstreaming.utils;
import static org.junit.Assert.assertEquals;
@@ -12,22 +29,23 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+/** Test of the wrapping of Beam Coders as Spark ExpressionEncoders. */
@RunWith(JUnit4.class)
-/**
- * Test of the wrapping of Beam Coders as Spark ExpressionEncoders.
- */
public class EncodersTest {
@Test
public void beamCoderToSparkEncoderTest() {
- SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest")
- .master("local[4]").getOrCreate();
+ SparkSession sparkSession =
+ SparkSession.builder()
+ .appName("beamCoderToSparkEncoderTest")
+ .master("local[4]")
+ .getOrCreate();
List<Integer> data = new ArrayList<>();
data.add(1);
data.add(2);
data.add(3);
- Dataset<Integer> dataset = sparkSession
- .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
+ Dataset<Integer> dataset =
+ sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
List<Integer> results = dataset.collectAsList();
assertEquals(data, results);
}