You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aviem Zur (JIRA)" <ji...@apache.org> on 2017/07/24 08:32:00 UTC
[jira] [Updated] (BEAM-2669) Kryo serialization exception when
DStreams containing non-Kryo-serializable data are cached
[ https://issues.apache.org/jira/browse/BEAM-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aviem Zur updated BEAM-2669:
----------------------------
Description:
Today, when we detect re-use of a dataset in a pipeline in Spark runner we eagerly cache it to avoid calculating the same data multiple times.
([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148])
When the dataset is bounded, which in Spark is represented by an {{RDD}}, we call {{RDD#persist}} and use storage level provided by the user via {{SparkPipelineOptions}}. ([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103])
When the dataset is unbounded, which in Spark is represented by a {{DStream}} we call {{DStream.cache()}} which defaults to persist the {{DStream}} using storage level {{MEMORY_ONLY_SER}} ([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61])
([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169])
Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using its configured serializer. Since we configure this to be Kryo in a hard coded fashion, this means the data will be serialized using Kryo. ([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99])
Due to this, if your {{DStream}} contains non-Kryo-serializable data you will encounter Kryo serialization exceptions and your task will fail.
Possible actions we should consider:
# Remove the hard coded Spark serializer configuration, this should be taken from the user's configuration of Spark, no real reason for us to interfere with this.
# Use the user's configured storage level configuration from {{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same as we do for bounded datasets.
# Make caching of re-used datasets configurable in {{SparkPipelineOptions}} (enable/disable). Although overloading our configuration with more options is always something not to be taken lightly.
was:
Today, when we detect re-use of a dataset in Spark runner we eagerly cache it to avoid calculating the same data multiple times.
([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148])
When the dataset is bounded, which in Spark is represented by an {{RDD}}, we call {{RDD#persist}} and use storage level provided by the user via {{SparkPipelineOptions}}. ([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103])
When the dataset is unbounded, which in Spark is represented by a {{DStream}} we call {{DStream.cache()}} which defaults to persist the {{DStream}} using storage level {{MEMORY_ONLY_SER}} ([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61])
([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169])
Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using its configured serializer. Since we configure this to be Kryo in a hard coded fashion, this means the data will be serialized using Kryo. ([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99])
Due to this, if your {{DStream}} contains non-Kryo-serializable data you will encounter Kryo serialization exceptions and your task will fail.
Possible actions we should consider:
# Remove the hard coded Spark serializer configuration, this should be taken from the user's configuration of Spark, no real reason for us to interfere with this.
# Use the user's configured storage level configuration from {{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same as we do for bounded datasets.
# Make caching of re-used datasets configurable in {{SparkPipelineOptions}} (enable/disable). Although overloading our configuration with more options is always something not to be taken lightly.
> Kryo serialization exception when DStreams containing non-Kryo-serializable data are cached
> -------------------------------------------------------------------------------------------
>
> Key: BEAM-2669
> URL: https://issues.apache.org/jira/browse/BEAM-2669
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 0.4.0, 0.5.0, 0.6.0, 2.0.0
> Reporter: Aviem Zur
> Assignee: Amit Sela
>
> Today, when we detect re-use of a dataset in a pipeline in Spark runner we eagerly cache it to avoid calculating the same data multiple times.
> ([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148])
> When the dataset is bounded, which in Spark is represented by an {{RDD}}, we call {{RDD#persist}} and use storage level provided by the user via {{SparkPipelineOptions}}. ([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103])
> When the dataset is unbounded, which in Spark is represented by a {{DStream}} we call {{DStream.cache()}} which defaults to persist the {{DStream}} using storage level {{MEMORY_ONLY_SER}} ([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61])
> ([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169])
> Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using its configured serializer. Since we configure this to be Kryo in a hard coded fashion, this means the data will be serialized using Kryo. ([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99])
> Due to this, if your {{DStream}} contains non-Kryo-serializable data you will encounter Kryo serialization exceptions and your task will fail.
> Possible actions we should consider:
> # Remove the hard coded Spark serializer configuration, this should be taken from the user's configuration of Spark, no real reason for us to interfere with this.
> # Use the user's configured storage level configuration from {{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same as we do for bounded datasets.
> # Make caching of re-used datasets configurable in {{SparkPipelineOptions}} (enable/disable). Although overloading our configuration with more options is always something not to be taken lightly.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)