You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:21:16 UTC

[jira] [Updated] (SPARK-17152) Spark Flume sink fails with begin() called when transaction is OPEN

     [ https://issues.apache.org/jira/browse/SPARK-17152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-17152:
---------------------------------
    Labels: bulk-closed flume sink spark  (was: flume sink spark)

> Spark Flume sink fails with begin() called when transaction is OPEN
> -------------------------------------------------------------------
>
>                 Key: SPARK-17152
>                 URL: https://issues.apache.org/jira/browse/SPARK-17152
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0
>            Reporter: Wojciech Sznapka
>            Priority: Major
>              Labels: bulk-closed, flume, sink, spark
>
> I'm running Flume and Spark on 2 EC2 instances (one for flume, one for spark).
> Flume config:
> {code}
> agent.channels = ch1
> agent.channels.ch1.type = SPILLABLEMEMORY
> agent.channels.ch1.capacity = 10000
> agent.channels.ch1.transactionCapacity=10
> agent.sources = seq
> agent.sources.seq.type = seq
> agent.sources.seq.channels = ch1
> agent.sinks = spark
> agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
> agent.sinks.spark.hostname = 0.0.0.0
> agent.sinks.spark.port = 5005
> agent.sinks.spark.channel = ch1
> {code}
> I run flume with:
> {code}./bin/flume-ng agent -Xmx512m --name agent -c conf/ -f rabbit.conf -Dflume.root.logger=INFO,console -C plugins.d/flume-ng-rabbitmq-master/target/flume-rabbitmq-channel-1.0-SNAPSHOT.jar:plugins.d/spark-streaming-flume-sink_2.11-2.0.0.jar{code}
> Then from the other machine I try to consume Spark Sink:
> {code}spark-2.0.0-bin-hadoop2.7$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 flume_wordcount.py 52.210.48.242 5005{code}
> with flume_wordcount.py variation (utilizing Polling Stream):
> {code}
> from __future__ import print_function
> import sys
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.flume import FlumeUtils
> if __name__ == "__main__":
>     if len(sys.argv) != 3:
>         print("Usage: flume_wordcount.py <hostname> <port> <batch_size>", file=sys.stderr)
>         exit(-1)
>     hostname, port= sys.argv[1:]
>     sc = SparkContext(appName="PythonStreamingFlumeWordCount")
>     ssc = StreamingContext(sc, 10)
>     kvs = FlumeUtils.createPollingStream(ssc, [[hostname, int(port)]], maxBatchSize=10, parallelism=1)
>     lines = kvs.map(lambda x: x[1])
>     counts = lines.flatMap(lambda line: line.split(" ")) \
>         .map(lambda word: (word, 1)) \
>         .reduceByKey(lambda a, b: a+b)
>     counts.pprint()
>     ssc.start()
>     ssc.awaitTermination()
> {code}
> Whatever I do, I always get this on Flume:
> {code}2016-08-19 14:04:48,679 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] BOUND: /10.28.230.185:5005
> 2016-08-19 14:04:48,679 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] CONNECTED: /78.10.176.10:57551
> 2016-08-19 14:04:49,401 (New I/O  worker #1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 
> 2016-08-19 14:04:50,154 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:80)] Error while processing transaction.
> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>         at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>         at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.begin(SpillableMemoryChannel.java:305)
>         at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
>         at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
>         at scala.Option.foreach(Option.scala:236)
>         at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
>         at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
>         at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back.
> 2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [ERROR - org.apache.spark.streaming.flume.sink.Logging$class.logError(Logging.scala:84)] Error rolling back transaction. Rollback may have failed!
> {code}
> I'm using latest tgz downloaded from spark and flume. Additional packages for Flume are downloaded from maven central as jars.
> Can anyone help with this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org