You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 07:22:31 UTC

[1/3] git commit: Set default logging to WARN for Spark streaming examples.

Updated Branches:
  refs/heads/master 300eaa994 -> 997c830e0


Set default logging to WARN for Spark streaming examples.

This programatically sets the log level to WARN by default for streaming
tests. If the user has already specified a log4j.properties file,
the user's file will take precedence over this default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/35f80da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/35f80da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/35f80da2

Branch: refs/heads/master
Commit: 35f80da21aaea8c6fde089754ef3a86bc78e0428
Parents: 04d83fc
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jan 8 20:54:24 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 9 10:42:58 2014 -0800

----------------------------------------------------------------------
 .../streaming/examples/JavaFlumeEventCount.java |  2 ++
 .../streaming/examples/JavaKafkaWordCount.java  |  2 ++
 .../examples/JavaNetworkWordCount.java          |  2 ++
 .../streaming/examples/JavaQueueStream.java     |  2 ++
 .../streaming/examples/ActorWordCount.scala     | 12 +++++------
 .../streaming/examples/FlumeEventCount.scala    |  4 +++-
 .../streaming/examples/HdfsWordCount.scala      |  3 ++-
 .../streaming/examples/KafkaWordCount.scala     |  5 +++--
 .../streaming/examples/MQTTWordCount.scala      |  8 +++-----
 .../streaming/examples/NetworkWordCount.scala   |  2 ++
 .../spark/streaming/examples/QueueStream.scala  |  8 +++++---
 .../streaming/examples/RawNetworkGrep.scala     |  5 +++--
 .../examples/StatefulNetworkWordCount.scala     |  2 ++
 .../streaming/examples/StreamingExamples.scala  | 21 ++++++++++++++++++++
 .../streaming/examples/TwitterAlgebirdCMS.scala | 10 ++++++----
 .../streaming/examples/TwitterAlgebirdHLL.scala |  9 ++++++---
 .../streaming/examples/TwitterPopularTags.scala |  2 ++
 .../streaming/examples/ZeroMQWordCount.scala    |  1 +
 .../examples/clickstream/PageViewStream.scala   |  4 +++-
 .../org/apache/spark/streaming/DStream.scala    |  2 +-
 .../apache/spark/streaming/DStreamGraph.scala   |  8 ++++----
 21 files changed, 80 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index b11cfa6..7b5a243 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -47,6 +47,8 @@ public final class JavaFlumeEventCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     String master = args[0];
     String host = args[1];
     int port = Integer.parseInt(args[2]);

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 16b8a94..04f62ee 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -59,6 +59,8 @@ public final class JavaKafkaWordCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context with a 1 second batch size
     JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
             new Duration(2000), System.getenv("SPARK_HOME"),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 1e2efd3..c37b0ca 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -53,6 +53,8 @@ public final class JavaNetworkWordCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context with a 1 second batch size
     JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
             new Duration(1000), System.getenv("SPARK_HOME"),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index e05551a..7ef9c6c 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -41,6 +41,8 @@ public final class JavaQueueStream {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context
     JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
             System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 4e0058c..57e1b1f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -18,17 +18,13 @@
 package org.apache.spark.streaming.examples
 
 import scala.collection.mutable.LinkedList
-import scala.util.Random
 import scala.reflect.ClassTag
+import scala.util.Random
 
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.actorRef2Scala
+import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.receivers.Receiver
 import org.apache.spark.util.AkkaUtils
@@ -147,6 +143,8 @@ object ActorWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Seq(master, host, port) = args.toSeq
 
     // Create the context and set the batch size

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index ae3709b..a59be78 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.util.IntParam
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
 
 /**
  *  Produces a count of events received from Flume.
@@ -44,6 +44,8 @@ object FlumeEventCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, host, IntParam(port)) = args
 
     val batchInterval = Milliseconds(2000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index ea6ea67..704b315 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 
-
 /**
  * Counts words in new text files created in the given directory
  * Usage: HdfsWordCount <master> <directory>
@@ -38,6 +37,8 @@ object HdfsWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context
     val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 31a94bd..4a3d81c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -23,8 +23,8 @@ import kafka.producer._
 
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.util.RawTextHelper._
 import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.util.RawTextHelper._
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
@@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._
  */
 object KafkaWordCount {
   def main(args: Array[String]) {
-    
     if (args.length < 5) {
       System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, zkQuorum, group, topics, numThreads) = args
 
     val ssc =  new StreamingContext(master, "KafkaWordCount", Seconds(2),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 325290b..78b49fd 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.streaming.examples
 
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -43,6 +39,8 @@ object MQTTPublisher {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Seq(brokerUrl, topic) = args.toSeq
 
     try {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 6a32c75..c12139b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -39,6 +39,8 @@ object NetworkWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index 9d640e7..4d4968b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.streaming.examples
 
+import scala.collection.mutable.SynchronizedQueue
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 
-import scala.collection.mutable.SynchronizedQueue
-
 object QueueStream {
   
   def main(args: Array[String]) {
@@ -30,7 +30,9 @@ object QueueStream {
       System.err.println("Usage: QueueStream <master>")
       System.exit(1)
     }
-    
+
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context
     val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index c0706d0..3d08d86 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.util.IntParam
 import org.apache.spark.storage.StorageLevel
-
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.util.RawTextHelper
+import org.apache.spark.util.IntParam
 
 /**
  * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
@@ -45,6 +44,8 @@ object RawNetworkGrep {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
 
     // Create the context

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 002db57..1183eba 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -39,6 +39,8 @@ object StatefulNetworkWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val updateFunc = (values: Seq[Int], state: Option[Int]) => {
       val currentCount = values.foldLeft(0)(_ + _)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
new file mode 100644
index 0000000..d41d84a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.Logging
+
+import org.apache.log4j.{Level, Logger}
+
+/** Utility functions for Spark Streaming examples. */
+object StreamingExamples extends Logging {
+
+  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+  def setStreamingLogLevels() {
+    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+    if (!log4jInitialized) {
+      // We first log something to initialize Spark's default logging, then we override the
+      // logging level.
+      logInfo("Setting log level to [WARN] for streaming example." +
+        " To override add a custom log4j.properties to the classpath.")
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 3ccdc90..80b5a98 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
 import com.twitter.algebird._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
 
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.twitter._
 
 /**
@@ -51,6 +51,8 @@ object TwitterAlgebirdCMS {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // CMS parameters
     val DELTA = 1E-3
     val EPS = 0.01

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c7e83e7..cb2f2c5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
-import com.twitter.algebird.HyperLogLog._
 import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.twitter._
 
 /**
@@ -44,6 +45,8 @@ object TwitterAlgebirdHLL {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
     val BIT_SIZE = 12
     val (master, filters) = (args.head, args.tail)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index e2b0418..16c10fe 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -36,6 +36,8 @@ object TwitterPopularTags {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val (master, filters) = (args.head, args.tail)
 
     val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 03902ec..12d2a10 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -76,6 +76,7 @@ object ZeroMQWordCount {
           "In local mode, <master> should be 'local[n]' with n > 1")
       System.exit(1)
     }
+    StreamingExamples.setStreamingLogLevels()
     val Seq(master, url, topic) = args.toSeq
 
     // Create the context and set the batch size

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 807af19..da6b67b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.streaming.examples.clickstream
 
+import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.examples.StreamingExamples
 
 /** Analyses a streaming dataset of web page views. This class demonstrates several types of
   * operators available in Spark streaming.
@@ -36,6 +37,7 @@ object PageViewStream {
                          " errorRatePerZipCode, activeUserCount, popularUsersSeen")
       System.exit(1)
     }
+    StreamingExamples.setStreamingLogLevels()
     val metric = args(0)
     val host = args(1)
     val port = args(2).toInt

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 00671ba..837f1ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -333,7 +333,7 @@ abstract class DStream[T: ClassTag] (
     var numForgotten = 0
     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
     generatedRDDs --= oldRDDs.keys
-    logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
     dependencies.foreach(_.clearOldMetadata(time))
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/35f80da2/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index a09b891..62d07b2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -105,18 +105,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
 
   def generateJobs(time: Time): Seq[Job] = {
     this.synchronized {
-      logInfo("Generating jobs for time " + time)
+      logDebug("Generating jobs for time " + time)
       val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
-      logInfo("Generated " + jobs.length + " jobs for time " + time)
+      logDebug("Generated " + jobs.length + " jobs for time " + time)
       jobs
     }
   }
 
   def clearOldMetadata(time: Time) {
     this.synchronized {
-      logInfo("Clearing old metadata for time " + time)
+      logDebug("Clearing old metadata for time " + time)
       outputStreams.foreach(_.clearOldMetadata(time))
-      logInfo("Cleared old metadata for time " + time)
+      logDebug("Cleared old metadata for time " + time)
     }
   }
 


[3/3] git commit: Merge pull request #363 from pwendell/streaming-logs

Posted by pw...@apache.org.
Merge pull request #363 from pwendell/streaming-logs

Set default logging to WARN for Spark streaming examples.

This programatically sets the log level to WARN by default for streaming
tests. If the user has already specified a log4j.properties file,
the user's file will take precedence over this default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/997c830e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/997c830e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/997c830e

Branch: refs/heads/master
Commit: 997c830e0b09e8be5000368f0d3e3c4516854bca
Parents: 300eaa9 7b748b8
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 9 22:22:20 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 9 22:22:20 2014 -0800

----------------------------------------------------------------------
 .../streaming/examples/JavaFlumeEventCount.java |  2 ++
 .../streaming/examples/JavaKafkaWordCount.java  |  2 ++
 .../examples/JavaNetworkWordCount.java          |  4 +++-
 .../streaming/examples/JavaQueueStream.java     |  2 ++
 .../streaming/examples/ActorWordCount.scala     | 12 +++++------
 .../streaming/examples/FlumeEventCount.scala    |  4 +++-
 .../streaming/examples/HdfsWordCount.scala      |  3 ++-
 .../streaming/examples/KafkaWordCount.scala     |  5 +++--
 .../streaming/examples/MQTTWordCount.scala      |  8 +++-----
 .../streaming/examples/NetworkWordCount.scala   |  2 ++
 .../spark/streaming/examples/QueueStream.scala  |  8 +++++---
 .../streaming/examples/RawNetworkGrep.scala     |  5 +++--
 .../examples/StatefulNetworkWordCount.scala     |  2 ++
 .../streaming/examples/StreamingExamples.scala  | 21 ++++++++++++++++++++
 .../streaming/examples/TwitterAlgebirdCMS.scala | 10 ++++++----
 .../streaming/examples/TwitterAlgebirdHLL.scala |  9 ++++++---
 .../streaming/examples/TwitterPopularTags.scala |  2 ++
 .../streaming/examples/ZeroMQWordCount.scala    |  1 +
 .../examples/clickstream/PageViewStream.scala   |  4 +++-
 .../org/apache/spark/streaming/DStream.scala    |  2 +-
 .../apache/spark/streaming/DStreamGraph.scala   |  8 ++++----
 21 files changed, 81 insertions(+), 35 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Minor clean-up

Posted by pw...@apache.org.
Minor clean-up


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7b748b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7b748b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7b748b83

Branch: refs/heads/master
Commit: 7b748b83a124a2b9c692da4a0c9285f4efa431b2
Parents: 35f80da
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 9 20:42:48 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 9 20:42:48 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/examples/JavaNetworkWordCount.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7b748b83/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index c37b0ca..2e616b1 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -38,7 +38,7 @@ import java.util.regex.Pattern;
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ *    `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
  */
 public final class JavaNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");