You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2014/11/11 00:01:53 UTC
JavaKafkaWordCount not working under Spark Streaming
I am embarrassed to admit but I can't get a basic 'word count' to work
under Kafka/Spark streaming. My code looks like this. I don't see any
word counts in console output. Also, don't see any output in UI. Needless
to say, I am newbie in both 'Spark' as well as 'Kafka'.
Please help. Thanks.
Here's the code:
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
<group> <topics> <numThreads>");
System.exit(1);
}
// StreamingExamples.setStreamingLogLevels();
// SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaWordCount");
// Location of the Spark directory
String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
// URL of the Spark cluster
String sparkUrl = "spark://mymachine:7077";
// Location of the required JAR files
String jarFiles =
"./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("JavaKafkaWordCount");
sparkConf.setJars(new String[]{jarFiles});
sparkConf.setMaster(sparkUrl);
sparkConf.set("spark.ui.port", "2348");
sparkConf.setSparkHome(sparkHome);
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", "myedgenode:2181");
kafkaParams.put("group.id", "1");
kafkaParams.put("metadata.broker.list", "myedgenode:9092");
kafkaParams.put("serializer.class",
"kafka.serializer.StringEncoder");
kafkaParams.put("request.required.acks", "1");
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
// JavaPairReceiverInputDStream<String, String> messages =
// KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaPairDStream<String, String> messages =
KafkaUtils.createStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_ONLY_SER());
JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by rusty <ra...@infoobjects.com>.
Check that you get the data from kafka producer
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws
Exception {
List<String> collect = rdd.collect();
for (String data : collect) {
try {
// save data in the log.txt
file
Path filePath = Paths
.get(rdd
save file);
if (!Files.exists(filePath))
{
Files.createFile(filePath);
}
String temp = "Text to be
added" + " data is " + data;
Files.write(filePath,
temp.getBytes(),
StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
});
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Chia-Chun Shih <ch...@gmail.com>.
You can run this command in kafka directory
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
myedgenode:2181 --group 1
(in which 1 is your consumer group id)
And check the two columns:
- Offset: should > 0 if your program (or you) have successfully produce
data, and should increase if your program produces more data
- Lag: should == 0 if Spark has consumed all data in Kafka
This command can help you identify whether the problem occurs in producer
side or in consumer side.
regards,
Chia-Chun
2014-11-11 15:10 GMT+08:00 Akhil Das <ak...@sigmoidanalytics.com>:
> Here's a simple working version.
>
>
> import com.google.common.collect.Lists;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.FlatMapFunction;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.Function2;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import scala.Tuple2;
>
> import java.util.HashMap;
> import java.util.Map;
>
> /**
> * Created by akhld on 11/11/14.
> */
>
> public class KafkaWordcount {
>
> public static void main(String[] args) {
>
> // Location of the Spark directory
> String sparkHome = "/home/akhld/mobi/localcluster/spark-1";
>
> // URL of the Spark cluster
> String sparkUrl = "spark://akhldz:7077";
>
> // Location of the required JAR files
> String jarFiles =
> "/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar";
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("JavaKafkaWordCount");
> sparkConf.setJars(new String[]{jarFiles});
> sparkConf.setMaster(sparkUrl);
> sparkConf.setSparkHome(sparkHome);
>
> //These are the minimal things that are required
>
> *Map<String, Integer> topicMap = new HashMap<String, Integer>();*
> * topicMap.put("test", 1);*
> * String kafkaGroup = "groups";*
> * String zkQuorum = "localhost:2181";*
>
> // Create the context with a 1 second batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new Duration(2000));
>
> JavaPairDStream<String, String> messages =
> KafkaUtils.createStream(jssc, zkQuorum,
> kafkaGroup, topicMap);
>
>
> JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String, String>, String>() {
> @Override
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
>
> JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
> @Override
> public Iterable<String> call(String x) {
> return Lists.newArrayList(x.split(" "));
> }
> });
>
> JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<String, Integer>(s, 1);
> }
> }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>
>
> }
>
> }
>
>
> [image: Inline image 1]
>
> Thanks
> Best Regards
>
> On Tue, Nov 11, 2014 at 5:37 AM, Something Something <
> mailinglists19@gmail.com> wrote:
>
>> I am not running locally. The Spark master is:
>>
>> "spark://<machine name>:7077"
>>
>>
>>
>> On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> What is the Spark master that you are using. Use local[4], not local
>>> if you are running locally.
>>>
>>> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>>> <ma...@gmail.com> wrote:
>>> > I am embarrassed to admit but I can't get a basic 'word count' to work
>>> under
>>> > Kafka/Spark streaming. My code looks like this. I don't see any word
>>> > counts in console output. Also, don't see any output in UI. Needless
>>> to
>>> > say, I am newbie in both 'Spark' as well as 'Kafka'.
>>> >
>>> > Please help. Thanks.
>>> >
>>> > Here's the code:
>>> >
>>> > public static void main(String[] args) {
>>> > if (args.length < 4) {
>>> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
>>> <group>
>>> > <topics> <numThreads>");
>>> > System.exit(1);
>>> > }
>>> >
>>> > // StreamingExamples.setStreamingLogLevels();
>>> > // SparkConf sparkConf = new
>>> > SparkConf().setAppName("JavaKafkaWordCount");
>>> >
>>> > // Location of the Spark directory
>>> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>>> >
>>> > // URL of the Spark cluster
>>> > String sparkUrl = "spark://mymachine:7077";
>>> >
>>> > // Location of the required JAR files
>>> > String jarFiles =
>>> >
>>> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>>> >
>>> > SparkConf sparkConf = new SparkConf();
>>> > sparkConf.setAppName("JavaKafkaWordCount");
>>> > sparkConf.setJars(new String[]{jarFiles});
>>> > sparkConf.setMaster(sparkUrl);
>>> > sparkConf.set("spark.ui.port", "2348");
>>> > sparkConf.setSparkHome(sparkHome);
>>> >
>>> > Map<String, String> kafkaParams = new HashMap<String,
>>> String>();
>>> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>>> > kafkaParams.put("group.id", "1");
>>> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>>> > kafkaParams.put("serializer.class",
>>> > "kafka.serializer.StringEncoder");
>>> > kafkaParams.put("request.required.acks", "1");
>>> >
>>> > // Create the context with a 1 second batch size
>>> > JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf, new
>>> > Duration(2000));
>>> >
>>> > int numThreads = Integer.parseInt(args[3]);
>>> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
>>> > String[] topics = args[2].split(",");
>>> > for (String topic: topics) {
>>> > topicMap.put(topic, numThreads);
>>> > }
>>> >
>>> > // JavaPairReceiverInputDStream<String, String> messages =
>>> > // KafkaUtils.createStream(jssc, args[0], args[1],
>>> topicMap);
>>> > JavaPairDStream<String, String> messages =
>>> > KafkaUtils.createStream(jssc,
>>> > String.class,
>>> > String.class,
>>> > StringDecoder.class,
>>> > StringDecoder.class,
>>> > kafkaParams,
>>> > topicMap,
>>> > StorageLevel.MEMORY_ONLY_SER());
>>> >
>>> >
>>> > JavaDStream<String> lines = messages.map(new
>>> Function<Tuple2<String,
>>> > String>, String>() {
>>> > @Override
>>> > public String call(Tuple2<String, String> tuple2) {
>>> > return tuple2._2();
>>> > }
>>> > });
>>> >
>>> > JavaDStream<String> words = lines.flatMap(new
>>> > FlatMapFunction<String, String>() {
>>> > @Override
>>> > public Iterable<String> call(String x) {
>>> > return Lists.newArrayList(SPACE.split(x));
>>> > }
>>> > });
>>> >
>>> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>>> > new PairFunction<String, String, Integer>() {
>>> > @Override
>>> > public Tuple2<String, Integer> call(String s) {
>>> > return new Tuple2<String, Integer>(s, 1);
>>> > }
>>> > }).reduceByKey(new Function2<Integer, Integer,
>>> Integer>() {
>>> > @Override
>>> > public Integer call(Integer i1, Integer i2) {
>>> > return i1 + i2;
>>> > }
>>> > });
>>> >
>>> > wordCounts.print();
>>> > jssc.start();
>>> > jssc.awaitTermination();
>>> >
>>>
>>
>>
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Here's a simple working version.
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
/**
* Created by akhld on 11/11/14.
*/
public class KafkaWordcount {
public static void main(String[] args) {
// Location of the Spark directory
String sparkHome = "/home/akhld/mobi/localcluster/spark-1";
// URL of the Spark cluster
String sparkUrl = "spark://akhldz:7077";
// Location of the required JAR files
String jarFiles =
"/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar";
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("JavaKafkaWordCount");
sparkConf.setJars(new String[]{jarFiles});
sparkConf.setMaster(sparkUrl);
sparkConf.setSparkHome(sparkHome);
//These are the minimal things that are required
*Map<String, Integer> topicMap = new HashMap<String, Integer>();*
* topicMap.put("test", 1);*
* String kafkaGroup = "groups";*
* String zkQuorum = "localhost:2181";*
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));
JavaPairDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum,
kafkaGroup, topicMap);
JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
[image: Inline image 1]
Thanks
Best Regards
On Tue, Nov 11, 2014 at 5:37 AM, Something Something <
mailinglists19@gmail.com> wrote:
> I am not running locally. The Spark master is:
>
> "spark://<machine name>:7077"
>
>
>
> On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> What is the Spark master that you are using. Use local[4], not local
>> if you are running locally.
>>
>> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>> <ma...@gmail.com> wrote:
>> > I am embarrassed to admit but I can't get a basic 'word count' to work
>> under
>> > Kafka/Spark streaming. My code looks like this. I don't see any word
>> > counts in console output. Also, don't see any output in UI. Needless
>> to
>> > say, I am newbie in both 'Spark' as well as 'Kafka'.
>> >
>> > Please help. Thanks.
>> >
>> > Here's the code:
>> >
>> > public static void main(String[] args) {
>> > if (args.length < 4) {
>> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
>> <group>
>> > <topics> <numThreads>");
>> > System.exit(1);
>> > }
>> >
>> > // StreamingExamples.setStreamingLogLevels();
>> > // SparkConf sparkConf = new
>> > SparkConf().setAppName("JavaKafkaWordCount");
>> >
>> > // Location of the Spark directory
>> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>> >
>> > // URL of the Spark cluster
>> > String sparkUrl = "spark://mymachine:7077";
>> >
>> > // Location of the required JAR files
>> > String jarFiles =
>> >
>> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>> >
>> > SparkConf sparkConf = new SparkConf();
>> > sparkConf.setAppName("JavaKafkaWordCount");
>> > sparkConf.setJars(new String[]{jarFiles});
>> > sparkConf.setMaster(sparkUrl);
>> > sparkConf.set("spark.ui.port", "2348");
>> > sparkConf.setSparkHome(sparkHome);
>> >
>> > Map<String, String> kafkaParams = new HashMap<String, String>();
>> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>> > kafkaParams.put("group.id", "1");
>> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>> > kafkaParams.put("serializer.class",
>> > "kafka.serializer.StringEncoder");
>> > kafkaParams.put("request.required.acks", "1");
>> >
>> > // Create the context with a 1 second batch size
>> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> new
>> > Duration(2000));
>> >
>> > int numThreads = Integer.parseInt(args[3]);
>> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
>> > String[] topics = args[2].split(",");
>> > for (String topic: topics) {
>> > topicMap.put(topic, numThreads);
>> > }
>> >
>> > // JavaPairReceiverInputDStream<String, String> messages =
>> > // KafkaUtils.createStream(jssc, args[0], args[1],
>> topicMap);
>> > JavaPairDStream<String, String> messages =
>> > KafkaUtils.createStream(jssc,
>> > String.class,
>> > String.class,
>> > StringDecoder.class,
>> > StringDecoder.class,
>> > kafkaParams,
>> > topicMap,
>> > StorageLevel.MEMORY_ONLY_SER());
>> >
>> >
>> > JavaDStream<String> lines = messages.map(new
>> Function<Tuple2<String,
>> > String>, String>() {
>> > @Override
>> > public String call(Tuple2<String, String> tuple2) {
>> > return tuple2._2();
>> > }
>> > });
>> >
>> > JavaDStream<String> words = lines.flatMap(new
>> > FlatMapFunction<String, String>() {
>> > @Override
>> > public Iterable<String> call(String x) {
>> > return Lists.newArrayList(SPACE.split(x));
>> > }
>> > });
>> >
>> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>> > new PairFunction<String, String, Integer>() {
>> > @Override
>> > public Tuple2<String, Integer> call(String s) {
>> > return new Tuple2<String, Integer>(s, 1);
>> > }
>> > }).reduceByKey(new Function2<Integer, Integer,
>> Integer>() {
>> > @Override
>> > public Integer call(Integer i1, Integer i2) {
>> > return i1 + i2;
>> > }
>> > });
>> >
>> > wordCounts.print();
>> > jssc.start();
>> > jssc.awaitTermination();
>> >
>>
>
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Here's a simple working version.
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
/**
* Created by akhld on 11/11/14.
*/
public class KafkaWordcount {
public static void main(String[] args) {
// Location of the Spark directory
String sparkHome = "/home/akhld/mobi/localcluster/spark-1";
// URL of the Spark cluster
String sparkUrl = "spark://akhldz:7077";
// Location of the required JAR files
String jarFiles =
"/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar";
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("JavaKafkaWordCount");
sparkConf.setJars(new String[]{jarFiles});
sparkConf.setMaster(sparkUrl);
sparkConf.setSparkHome(sparkHome);
//These are the minimal things that are required
*Map<String, Integer> topicMap = new HashMap<String, Integer>();*
* topicMap.put("test", 1);*
* String kafkaGroup = "groups";*
* String zkQuorum = "localhost:2181";*
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));
JavaPairDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum,
kafkaGroup, topicMap);
JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
[image: Inline image 1]
Thanks
Best Regards
On Tue, Nov 11, 2014 at 5:37 AM, Something Something <
mailinglists19@gmail.com> wrote:
> I am not running locally. The Spark master is:
>
> "spark://<machine name>:7077"
>
>
>
> On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> What is the Spark master that you are using. Use local[4], not local
>> if you are running locally.
>>
>> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>> <ma...@gmail.com> wrote:
>> > I am embarrassed to admit but I can't get a basic 'word count' to work
>> under
>> > Kafka/Spark streaming. My code looks like this. I don't see any word
>> > counts in console output. Also, don't see any output in UI. Needless
>> to
>> > say, I am newbie in both 'Spark' as well as 'Kafka'.
>> >
>> > Please help. Thanks.
>> >
>> > Here's the code:
>> >
>> > public static void main(String[] args) {
>> > if (args.length < 4) {
>> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
>> <group>
>> > <topics> <numThreads>");
>> > System.exit(1);
>> > }
>> >
>> > // StreamingExamples.setStreamingLogLevels();
>> > // SparkConf sparkConf = new
>> > SparkConf().setAppName("JavaKafkaWordCount");
>> >
>> > // Location of the Spark directory
>> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>> >
>> > // URL of the Spark cluster
>> > String sparkUrl = "spark://mymachine:7077";
>> >
>> > // Location of the required JAR files
>> > String jarFiles =
>> >
>> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>> >
>> > SparkConf sparkConf = new SparkConf();
>> > sparkConf.setAppName("JavaKafkaWordCount");
>> > sparkConf.setJars(new String[]{jarFiles});
>> > sparkConf.setMaster(sparkUrl);
>> > sparkConf.set("spark.ui.port", "2348");
>> > sparkConf.setSparkHome(sparkHome);
>> >
>> > Map<String, String> kafkaParams = new HashMap<String, String>();
>> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>> > kafkaParams.put("group.id", "1");
>> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>> > kafkaParams.put("serializer.class",
>> > "kafka.serializer.StringEncoder");
>> > kafkaParams.put("request.required.acks", "1");
>> >
>> > // Create the context with a 1 second batch size
>> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> new
>> > Duration(2000));
>> >
>> > int numThreads = Integer.parseInt(args[3]);
>> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
>> > String[] topics = args[2].split(",");
>> > for (String topic: topics) {
>> > topicMap.put(topic, numThreads);
>> > }
>> >
>> > // JavaPairReceiverInputDStream<String, String> messages =
>> > // KafkaUtils.createStream(jssc, args[0], args[1],
>> topicMap);
>> > JavaPairDStream<String, String> messages =
>> > KafkaUtils.createStream(jssc,
>> > String.class,
>> > String.class,
>> > StringDecoder.class,
>> > StringDecoder.class,
>> > kafkaParams,
>> > topicMap,
>> > StorageLevel.MEMORY_ONLY_SER());
>> >
>> >
>> > JavaDStream<String> lines = messages.map(new
>> Function<Tuple2<String,
>> > String>, String>() {
>> > @Override
>> > public String call(Tuple2<String, String> tuple2) {
>> > return tuple2._2();
>> > }
>> > });
>> >
>> > JavaDStream<String> words = lines.flatMap(new
>> > FlatMapFunction<String, String>() {
>> > @Override
>> > public Iterable<String> call(String x) {
>> > return Lists.newArrayList(SPACE.split(x));
>> > }
>> > });
>> >
>> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>> > new PairFunction<String, String, Integer>() {
>> > @Override
>> > public Tuple2<String, Integer> call(String s) {
>> > return new Tuple2<String, Integer>(s, 1);
>> > }
>> > }).reduceByKey(new Function2<Integer, Integer,
>> Integer>() {
>> > @Override
>> > public Integer call(Integer i1, Integer i2) {
>> > return i1 + i2;
>> > }
>> > });
>> >
>> > wordCounts.print();
>> > jssc.start();
>> > jssc.awaitTermination();
>> >
>>
>
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Something Something <ma...@gmail.com>.
I am not running locally. The Spark master is:
"spark://<machine name>:7077"
On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <ta...@gmail.com>
wrote:
> What is the Spark master that you are using. Use local[4], not local
> if you are running locally.
>
> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
> <ma...@gmail.com> wrote:
> > I am embarrassed to admit but I can't get a basic 'word count' to work
> under
> > Kafka/Spark streaming. My code looks like this. I don't see any word
> > counts in console output. Also, don't see any output in UI. Needless to
> > say, I am newbie in both 'Spark' as well as 'Kafka'.
> >
> > Please help. Thanks.
> >
> > Here's the code:
> >
> > public static void main(String[] args) {
> > if (args.length < 4) {
> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
> <group>
> > <topics> <numThreads>");
> > System.exit(1);
> > }
> >
> > // StreamingExamples.setStreamingLogLevels();
> > // SparkConf sparkConf = new
> > SparkConf().setAppName("JavaKafkaWordCount");
> >
> > // Location of the Spark directory
> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
> >
> > // URL of the Spark cluster
> > String sparkUrl = "spark://mymachine:7077";
> >
> > // Location of the required JAR files
> > String jarFiles =
> >
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
> >
> > SparkConf sparkConf = new SparkConf();
> > sparkConf.setAppName("JavaKafkaWordCount");
> > sparkConf.setJars(new String[]{jarFiles});
> > sparkConf.setMaster(sparkUrl);
> > sparkConf.set("spark.ui.port", "2348");
> > sparkConf.setSparkHome(sparkHome);
> >
> > Map<String, String> kafkaParams = new HashMap<String, String>();
> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> > kafkaParams.put("group.id", "1");
> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> > kafkaParams.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > kafkaParams.put("request.required.acks", "1");
> >
> > // Create the context with a 1 second batch size
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new
> > Duration(2000));
> >
> > int numThreads = Integer.parseInt(args[3]);
> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
> > String[] topics = args[2].split(",");
> > for (String topic: topics) {
> > topicMap.put(topic, numThreads);
> > }
> >
> > // JavaPairReceiverInputDStream<String, String> messages =
> > // KafkaUtils.createStream(jssc, args[0], args[1],
> topicMap);
> > JavaPairDStream<String, String> messages =
> > KafkaUtils.createStream(jssc,
> > String.class,
> > String.class,
> > StringDecoder.class,
> > StringDecoder.class,
> > kafkaParams,
> > topicMap,
> > StorageLevel.MEMORY_ONLY_SER());
> >
> >
> > JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String,
> > String>, String>() {
> > @Override
> > public String call(Tuple2<String, String> tuple2) {
> > return tuple2._2();
> > }
> > });
> >
> > JavaDStream<String> words = lines.flatMap(new
> > FlatMapFunction<String, String>() {
> > @Override
> > public Iterable<String> call(String x) {
> > return Lists.newArrayList(SPACE.split(x));
> > }
> > });
> >
> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> > new PairFunction<String, String, Integer>() {
> > @Override
> > public Tuple2<String, Integer> call(String s) {
> > return new Tuple2<String, Integer>(s, 1);
> > }
> > }).reduceByKey(new Function2<Integer, Integer,
> Integer>() {
> > @Override
> > public Integer call(Integer i1, Integer i2) {
> > return i1 + i2;
> > }
> > });
> >
> > wordCounts.print();
> > jssc.start();
> > jssc.awaitTermination();
> >
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Something Something <ma...@gmail.com>.
I am not running locally. The Spark master is:
"spark://<machine name>:7077"
On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <ta...@gmail.com>
wrote:
> What is the Spark master that you are using. Use local[4], not local
> if you are running locally.
>
> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
> <ma...@gmail.com> wrote:
> > I am embarrassed to admit but I can't get a basic 'word count' to work
> under
> > Kafka/Spark streaming. My code looks like this. I don't see any word
> > counts in console output. Also, don't see any output in UI. Needless to
> > say, I am newbie in both 'Spark' as well as 'Kafka'.
> >
> > Please help. Thanks.
> >
> > Here's the code:
> >
> > public static void main(String[] args) {
> > if (args.length < 4) {
> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
> <group>
> > <topics> <numThreads>");
> > System.exit(1);
> > }
> >
> > // StreamingExamples.setStreamingLogLevels();
> > // SparkConf sparkConf = new
> > SparkConf().setAppName("JavaKafkaWordCount");
> >
> > // Location of the Spark directory
> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
> >
> > // URL of the Spark cluster
> > String sparkUrl = "spark://mymachine:7077";
> >
> > // Location of the required JAR files
> > String jarFiles =
> >
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
> >
> > SparkConf sparkConf = new SparkConf();
> > sparkConf.setAppName("JavaKafkaWordCount");
> > sparkConf.setJars(new String[]{jarFiles});
> > sparkConf.setMaster(sparkUrl);
> > sparkConf.set("spark.ui.port", "2348");
> > sparkConf.setSparkHome(sparkHome);
> >
> > Map<String, String> kafkaParams = new HashMap<String, String>();
> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> > kafkaParams.put("group.id", "1");
> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> > kafkaParams.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > kafkaParams.put("request.required.acks", "1");
> >
> > // Create the context with a 1 second batch size
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new
> > Duration(2000));
> >
> > int numThreads = Integer.parseInt(args[3]);
> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
> > String[] topics = args[2].split(",");
> > for (String topic: topics) {
> > topicMap.put(topic, numThreads);
> > }
> >
> > // JavaPairReceiverInputDStream<String, String> messages =
> > // KafkaUtils.createStream(jssc, args[0], args[1],
> topicMap);
> > JavaPairDStream<String, String> messages =
> > KafkaUtils.createStream(jssc,
> > String.class,
> > String.class,
> > StringDecoder.class,
> > StringDecoder.class,
> > kafkaParams,
> > topicMap,
> > StorageLevel.MEMORY_ONLY_SER());
> >
> >
> > JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String,
> > String>, String>() {
> > @Override
> > public String call(Tuple2<String, String> tuple2) {
> > return tuple2._2();
> > }
> > });
> >
> > JavaDStream<String> words = lines.flatMap(new
> > FlatMapFunction<String, String>() {
> > @Override
> > public Iterable<String> call(String x) {
> > return Lists.newArrayList(SPACE.split(x));
> > }
> > });
> >
> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> > new PairFunction<String, String, Integer>() {
> > @Override
> > public Tuple2<String, Integer> call(String s) {
> > return new Tuple2<String, Integer>(s, 1);
> > }
> > }).reduceByKey(new Function2<Integer, Integer,
> Integer>() {
> > @Override
> > public Integer call(Integer i1, Integer i2) {
> > return i1 + i2;
> > }
> > });
> >
> > wordCounts.print();
> > jssc.start();
> > jssc.awaitTermination();
> >
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Tathagata Das <ta...@gmail.com>.
What is the Spark master that you are using. Use local[4], not local
if you are running locally.
On Mon, Nov 10, 2014 at 3:01 PM, Something Something
<ma...@gmail.com> wrote:
> I am embarrassed to admit but I can't get a basic 'word count' to work under
> Kafka/Spark streaming. My code looks like this. I don't see any word
> counts in console output. Also, don't see any output in UI. Needless to
> say, I am newbie in both 'Spark' as well as 'Kafka'.
>
> Please help. Thanks.
>
> Here's the code:
>
> public static void main(String[] args) {
> if (args.length < 4) {
> System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group>
> <topics> <numThreads>");
> System.exit(1);
> }
>
> // StreamingExamples.setStreamingLogLevels();
> // SparkConf sparkConf = new
> SparkConf().setAppName("JavaKafkaWordCount");
>
> // Location of the Spark directory
> String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>
> // URL of the Spark cluster
> String sparkUrl = "spark://mymachine:7077";
>
> // Location of the required JAR files
> String jarFiles =
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("JavaKafkaWordCount");
> sparkConf.setJars(new String[]{jarFiles});
> sparkConf.setMaster(sparkUrl);
> sparkConf.set("spark.ui.port", "2348");
> sparkConf.setSparkHome(sparkHome);
>
> Map<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> kafkaParams.put("group.id", "1");
> kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> kafkaParams.put("serializer.class",
> "kafka.serializer.StringEncoder");
> kafkaParams.put("request.required.acks", "1");
>
> // Create the context with a 1 second batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
> int numThreads = Integer.parseInt(args[3]);
> Map<String, Integer> topicMap = new HashMap<String, Integer>();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
> topicMap.put(topic, numThreads);
> }
>
> // JavaPairReceiverInputDStream<String, String> messages =
> // KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
> JavaPairDStream<String, String> messages =
> KafkaUtils.createStream(jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicMap,
> StorageLevel.MEMORY_ONLY_SER());
>
>
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
> @Override
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
>
> JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
> @Override
> public Iterable<String> call(String x) {
> return Lists.newArrayList(SPACE.split(x));
> }
> });
>
> JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<String, Integer>(s, 1);
> }
> }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>
Re: JavaKafkaWordCount not working under Spark Streaming
Posted by Tathagata Das <ta...@gmail.com>.
What is the Spark master that you are using. Use local[4], not local
if you are running locally.
On Mon, Nov 10, 2014 at 3:01 PM, Something Something
<ma...@gmail.com> wrote:
> I am embarrassed to admit but I can't get a basic 'word count' to work under
> Kafka/Spark streaming. My code looks like this. I don't see any word
> counts in console output. Also, don't see any output in UI. Needless to
> say, I am newbie in both 'Spark' as well as 'Kafka'.
>
> Please help. Thanks.
>
> Here's the code:
>
> public static void main(String[] args) {
> if (args.length < 4) {
> System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group>
> <topics> <numThreads>");
> System.exit(1);
> }
>
> // StreamingExamples.setStreamingLogLevels();
> // SparkConf sparkConf = new
> SparkConf().setAppName("JavaKafkaWordCount");
>
> // Location of the Spark directory
> String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>
> // URL of the Spark cluster
> String sparkUrl = "spark://mymachine:7077";
>
> // Location of the required JAR files
> String jarFiles =
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("JavaKafkaWordCount");
> sparkConf.setJars(new String[]{jarFiles});
> sparkConf.setMaster(sparkUrl);
> sparkConf.set("spark.ui.port", "2348");
> sparkConf.setSparkHome(sparkHome);
>
> Map<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> kafkaParams.put("group.id", "1");
> kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> kafkaParams.put("serializer.class",
> "kafka.serializer.StringEncoder");
> kafkaParams.put("request.required.acks", "1");
>
> // Create the context with a 1 second batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
> int numThreads = Integer.parseInt(args[3]);
> Map<String, Integer> topicMap = new HashMap<String, Integer>();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
> topicMap.put(topic, numThreads);
> }
>
> // JavaPairReceiverInputDStream<String, String> messages =
> // KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
> JavaPairDStream<String, String> messages =
> KafkaUtils.createStream(jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicMap,
> StorageLevel.MEMORY_ONLY_SER());
>
>
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
> @Override
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
>
> JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
> @Override
> public Iterable<String> call(String x) {
> return Lists.newArrayList(SPACE.split(x));
> }
> });
>
> JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<String, Integer>(s, 1);
> }
> }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org