You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "Steve Loughran (JIRA)" <ji...@apache.org> on 2019/01/02 22:16:00 UTC
[jira] [Resolved] (HADOOP-16021) SequenceFile.createWriter
appendIfExists codec cause NullPointerException
[ https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Steve Loughran resolved HADOOP-16021.
-------------------------------------
Resolution: Duplicate
> SequenceFile.createWriter appendIfExists codec cause NullPointerException
> -------------------------------------------------------------------------
>
> Key: HADOOP-16021
> URL: https://issues.apache.org/jira/browse/HADOOP-16021
> Project: Hadoop Common
> Issue Type: Bug
> Components: common
> Affects Versions: 2.7.3
> Environment: windows10 or Linux-centos , hadoop2.7.3, jdk8
> Reporter: asin
> Priority: Major
> Labels: bug
> Attachments: 055.png, 62.png, CompressionType.BLOCK-Not supported-error log.txt, CompressionType.NONE-NullPointerException-error log.txt
>
>
>
> I want append the data in a file , when i use SequenceFile.appendIfExists , it throw NullPointerException at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
> when i remove the 'appendIfExists', it works, but it will cover old file.
>
> when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception
>
> {code:java}
> // my code
> SequenceFile.Writer writer = null;
> writer = SequenceFile.createWriter(conf,
> SequenceFile.Writer.file(path),
> SequenceFile.Writer.keyClass(Text.class),
> SequenceFile.Writer.valueClass(Text.class),
> SequenceFile.Writer.appendIfExists(true) );
> {code}
>
> {code:java}
> // all my code
> public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>> {
> private static Configuration conf = new Configuration();
> private int MAX_LINE = 3; // little num,for test
> @Override
> public void call(Iterator<Tuple2<String, String>> iterator) throws Exception {
> int partitionId = TaskContext.get().partitionId();
> int count = 0;
> SequenceFile.Writer writer = null;
> while (iterator.hasNext()) {
> Tuple2<String, String> tp = iterator.next();
> Path path = new Path("D:/tmp-doc/logs/logs.txt");
> if (writer == null)
> writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
> SequenceFile.Writer.keyClass(Text.class),
> SequenceFile.Writer.valueClass(Text.class),
> SequenceFile.Writer.appendIfExists(true)
> );
> writer.append(new Text(tp._1), new Text(tp._2));
> count++;
> if (count > MAX_LINE) {
> IOUtils.closeStream(writer);
> count = 0;
> writer = SequenceFile.createWriter(... // same as above
> }
> }
> if (count > 0) {
> IOUtils.closeStream(writer);
> }
> IOUtils.closeStream(writer);
> }
> }
> {code}
> // above code call by below
> {code:java}
> import com.xxx.algo.hadoop.Writer1
> import com.xxx.algo.utils.Utils
> import kafka.serializer.StringDecoder
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.{Durations, StreamingContext}
> import org.apache.spark.{SparkConf, SparkContext}
> object KafkaSparkStreamingApp {
> def main(args: Array[String]): Unit = {
> val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
> val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
> val topics = Set("test.aries.collection.appevent.biz")
> val tag = "biz"
> val durationSeconds = 5000
> val conf = new SparkConf()
> conf.setAppName("user-log-consumer")
> .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrationRequired", "true")
> .set("spark.defalut.parallelism","2")
> .set("spark.rdd.compress","true")
> .setMaster("local[2]")
> val sc = new SparkContext(conf)
> val session = SparkSession.builder()
> .config(conf)
> .getOrCreate()
> val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
> val kafkaParams = Map[String, String](
> "metadata.broker.list" -> kafka,
> "bootstrap.servers" -> kafka,
> "zookeeper.connect" -> zk,
> "group.id" -> "recommend_stream_spark",
> "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
> "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
> "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
> )
> val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
> ssc,
> kafkaParams,
> topics
> )
> val timeFieldName = "log_time"
> stream.foreachRDD(rddMsg => {
> rddMsg.map(msg => {
> val value = msg._2
> val time = Utils.getTime(value, timeFieldName)
> new Tuple2(time + "," + tag, value)
> })
> .toJavaRDD().foreachPartition(new Writer1()) // here
> })
> ssc.start()
> ssc.awaitTermination()
> }
> }
> {code}
> {{more info see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org