You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:54 UTC

[beam] 32/50: Improve exception flow

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a3a87b49d589061c280cfc982a85ec1f85dd0138
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Dec 11 16:00:26 2018 +0100

    Improve exception flow
---
 .../spark/structuredstreaming/translation/io/DatasetSource.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 75cdd5d..d23ecf3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -137,6 +136,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
                   try {
                     reader = source.createReader(options);
                   } catch (IOException e) {
+                    throw new RuntimeException(
+                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
                   }
                   return new DatasetMicroBatchPartitionReader(reader);
                 }
@@ -145,9 +146,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
         return result;
 
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new RuntimeException(
+            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
       }
-      return result;
     }
   }