You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:09 UTC
[beam] 16/24: Add try catch around reflexion call in lazy init of
beam coder
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 723c004dcdf82073568dc1a758d8d7453a6af954
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Sep 5 10:13:05 2019 +0200
Add try catch around reflexion call in lazy init of beam coder
---
.../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 56097b7..05595f1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -388,12 +388,16 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B
ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> {
/*
CODE GENERATED
+ try {
v1 = coderClass.class.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+ }
*/
List<String> parts = new ArrayList<>();
- parts.add("");
+ parts.add("try {");
parts.add(" = ");
- parts.add(".class.getDeclaredConstructor().newInstance();");
+ parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
List<Object> args = new ArrayList<>();
args.add(v1);