You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "zhenglin.Tian" <zh...@cafintech.com> on 2017/05/11 04:14:33 UTC
some issues about WindowedValue$TimestampedValueInSingleWindow
public class UnifiedDataExtraction {
private static TupleTag<String> rawDataTag = new TupleTag<String>() {
};
private static TupleTag<String> riskEventLogTag = new TupleTag<String>() {
};
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
JavaSparkContext jsc = new JavaSparkContext("yarn-client", "testBeam");
jsc.setLocalProperty("spark.executor.memory", "1024m");
SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
options.setEnableSparkMetricSinks(false);
Pipeline p = Pipeline.create(options);
List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
/**
* 获取数据
*/
PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
.withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
.withTopics(topics)
.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
.withKeyCoder(VoidCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withKeyDeserializer(VoidDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
).apply(Values.<String>create());
PCollectionTuple results = rawData.apply("antifraudLog2Anatomy",
ParDo.of(AnatomyAntifraudLogDoFn.of(ConstantsOwn.TRACING,
ConstantsOwn.ANTIFRAUD_DELIMITER,
riskEventLogTag
)).withOutputTags(rawDataTag,
TupleTagList.of(riskEventLogTag)
));
PCollection<String> rawDatas = results.get(rawDataTag).setCoder(StringUtf8Coder.of());
//rawDatas.apply("rawDatas", ParDo.of(SimpleViewDoFn.of(true)));
rawDatas.apply("rawDatas2doc", ParDo.of(ToMongoDBDocDoFn2.of(ConstantsOwn.ANTIFRAUD_DELIMITER)))
.apply("rawDatas2mongodb", ToMongoDBPTransformUtil.of(ConstantsOwn.MONGODB_ADDRESS,
ConstantsOwn.DB_ANTIFRAUD_ANATOMY,
ConstantsOwn.COLLECTION_ANTIFRAUD_RAWDATA));
PCollection<String> exceptionDatas = results.get(exceptionTag);
exceptionDatas.apply("exceptionDatas2doc", ParDo.of(ToMongoDBDocDoFn1.of()))
.apply("exceptionDatas2mongodb", ToMongoDBPTransformUtil.of(ConstantsOwn.MONGODB_ADDRESS,
ConstantsOwn.DB_ANTIFRAUD_ANATOMY,
ConstantsOwn.COLLECTION_ANTIFRAUD_EXCEPTIONDATA));
p.run().waitUntilFinish();
}
}
when i run this code, i encounter a exception:
java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow
Serialization stack:
- object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow, value: TimestampedValueInSingleWindow{value={"et":0,"idCooper":"83babd4973c24c9facadaa1d1b4e6aa2","supplyParams"
.....
here is my apache beam dependency:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.7.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird]
On 2017/5/10 19:17:21, zhenglin.Tian <zh...@cafintech.com> wrote:
Hi,Aviem Zur
I have a issue about apache beam with SparkRunner. i read data from kafka and execute some logic,then get a exception:
2017-05-10 19:02:11,729 [task-result-getter-3] [org.apache.spark.scheduler.TaskSetManager] [ERROR] - Task 0.0 in stage 2341.0 (TID 2227) had a not serializable result: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
- object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value={"et":0,"idCooper":"83babd4973c24c9facadaa1d1b4e6aa2","supplyParams":[{"name":"loc","val":"39.915832,116.428342"},{"name":"amount","val":5001}
apache beam dependency is:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.7.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird]
On 2017/5/4 14:13:03, zhenglin.Tian <zh...@cafintech.com> wrote:
yes, when i tried to get the tag from the result,i have encountered 'NullPointerException' problem.
i pull the latest version, and rebuild my project, it runs OK!
thank you!!!
Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird]
On 2017/5/4 12:11:01, Aviem Zur <av...@gmail.com> wrote:
By "cannot run normally" do you mean you get an exception? We recently had a bug on master in which streaming pipelines containing `ParDo` with multiple outputs ran into `NullPointerException`. This was fixed here: https://issues.apache.org/jira/browse/BEAM-2029 [https://issues.apache.org/jira/browse/BEAM-2029]
Is this what you're facing? If so does pulling master and rebuilding help?
On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zhenglin.tian@cafintech.com [mailto:zhenglin.tian@cafintech.com]> wrote:
hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace DirectRunner with SparkRunner, the code can't run normally.
public class UnifiedDataExtraction {
private static TupleTag<String> rawDataTag = new TupleTag<String>() {
};
private static TupleTag<String> exceptionTag = new TupleTag<String>() {
};
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
options.setRunner(SparkRunner.class);
// options.setRunner(DirectRunner.class);
options.setStorageLevel("MEMORY_ONLY");
options.setAppName(ConstantsOwn.SPARK_APPNAME);
options.setBatchIntervalMillis(1000L);
options.setEnableSparkMetricSinks(false);
Pipeline p = Pipeline.create(options);
List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
.withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
.withTopics(topics)
//.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
.withKeyCoder(VoidCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withKeyDeserializer(VoidDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
).apply(Values.<String>create());
rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each elment of rawData. Able to run normally ①
PCollectionTuple results = rawData.apply("logAnatomyTest", // ②
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(ProcessContext c) {
String element = c.element();
System.out.println("===="+element);
if (!element.equals("EOF")) {
c.output(c.element());
}
}
}
).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
);
p.run().waitUntilFinish();
}
}
in the privious code, the code that be commented with ① can be able to run normally,but ②,i can't get anything.
here is my beam version
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.7.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
someone please help me.
Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:
Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0
This has since been fixed and the fix will be a part of the upcoming release.
Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 [https://issues.apache.org/jira/browse/BEAM-2106]
Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds.
On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:
Here is my maven configuration, thank you.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.6.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.6.0</version>
</dependency>
On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:
Hi,
Can you please share which version of Beam you are using?
On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:
hi, here is my program that about additional outputs for Apache Beam and the result :
public class DataExtraction2 {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setSparkMaster("local[4]");
// options.setCheckpointDir("./checkpoint");
options.setRunner(SparkRunner.class);
// options.setRunner(DirectRunner.class);
options.setStorageLevel("MEMORY_ONLY");
options.setAppName("testMavenDependency");
options.setBatchIntervalMillis(1000L);
options.setEnableSparkMetricSinks(false);
Pipeline p = Pipeline.create(options);
List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
final TupleTag<String> rawDataTag = new TupleTag<String>() {
};
final TupleTag<String> exceptionTag = new TupleTag<String>() {
};
final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
};
final TupleTag<String> statisticsTag = new TupleTag<String>() {
};
final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
};
final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
};
final TupleTag<String> performanceLogTag = new TupleTag<String>() {
};
PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
.withBootstrapServers("172.17.1.138:9092 [http://172.17.1.138:9092/],172.17.1.137:9092 [http://172.17.1.137:9092/]")
.withTopics(topics)
.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
.withKeyCoder(VoidCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withoutMetadata()
).apply(Values.<String>create());
PCollectionTuple results = rawData.apply(
ParDo.withOutputTags(rawDataTag,
TupleTagList.of(exceptionTag)
.and(riskEventLogTag)
.and(statisticsTag)
.and(errorTargetLogTag)
.and(equipmentLogTag)
.and(performanceLogTag))
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String idCoop = "";
int eventType = 0;
int osPlatformType = -1;
String innerDecision = "";
String outterDecision = "";
// Date appTime = new Date();
String eventId = "";
//String strategyList = "";
String uuid = "";
String phoneNo = "";
int equipmentType = -1;
int antiFraudTime = -1;
......
}
}));
p.run().waitUntilFinish();
}
}
when i run this program, i get result:
.....
....
2017-04-26 15:06:13,077 [pool-1-thread-1] [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
- object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
- field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext)
- object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
- field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
...
....
if only one main output, program works OK
can you tell me why?