You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yan Chen (JIRA)" <ji...@apache.org> on 2016/06/01 21:33:59 UTC
[jira] [Created] (SPARK-15716) Memory usage keep growing up in
Spark Streaming
Yan Chen created SPARK-15716:
--------------------------------
Summary: Memory usage keep growing up in Spark Streaming
Key: SPARK-15716
URL: https://issues.apache.org/jira/browse/SPARK-15716
Project: Spark
Issue Type: Bug
Components: Streaming
Affects Versions: 1.4.1
Environment: Oracle Java 1.8.0_51, SUSE Linux
Reporter: Yan Chen
Priority: Critical
Code:
{code:java}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
public class App {
public static void main(String[] args) {
final String input = args[0];
final String check = args[1];
final long interval = Long.parseLong(args[2]);
final SparkConf conf = new SparkConf();
conf.set("spark.yarn.access.namenodes", "hdfs://RBCDHDPA1");
conf.set("spark.streaming.minRememberDuration", "180s");
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
conf.set("spark.streaming.unpersist", "true");
conf.set("spark.streaming.ui.retainedBatches", "10");
conf.set("spark.ui.retainedJobs", "10");
conf.set("spark.ui.retainedStages", "10");
conf.set("spark.worker.ui.retainedExecutors", "10");
conf.set("spark.worker.ui.retainedDrivers", "10");
conf.set("spark.sql.ui.retainedExecutions", "10");
JavaStreamingContextFactory jscf = () -> {
SparkContext sc = new SparkContext(conf);
sc.setCheckpointDir(check);
StreamingContext ssc = new StreamingContext(sc, Durations.milliseconds(interval));
JavaStreamingContext jssc = new JavaStreamingContext(ssc);
jssc.checkpoint(check);
// setup pipeline here
JavaPairDStream<LongWritable, Text> inputStream =
jssc.fileStream(
input,
LongWritable.class,
Text.class,
TextInputFormat.class,
(filepath) -> Boolean.TRUE,
false
);
JavaPairDStream<LongWritable, Text> usbk = inputStream
.updateStateByKey((current, state) -> state);
usbk.checkpoint(Durations.seconds(10));
usbk.foreachRDD(rdd -> {
rdd.count();
System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
return null;
});
return jssc;
};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
jssc.start();
jssc.awaitTermination();
}
}
{code}
It's a very simple piece of code, when I ran it, the memory usage of driver keeps going up.
!http://i.imgur.com/uSzUui6.png!
The right most four red triangles are full GC's which are triggered manually by using "jcmd pid GC.run" command.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org