You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:54 UTC
[beam] 32/50: Improve exception flow
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit a3a87b49d589061c280cfc982a85ec1f85dd0138
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Dec 11 16:00:26 2018 +0100
Improve exception flow
---
.../spark/structuredstreaming/translation/io/DatasetSource.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 75cdd5d..d23ecf3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -137,6 +136,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
try {
reader = source.createReader(options);
} catch (IOException e) {
+ throw new RuntimeException(
+ "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
}
return new DatasetMicroBatchPartitionReader(reader);
}
@@ -145,9 +146,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
return result;
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(
+ "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
}
- return result;
}
}