You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "PRIYESH RANJAN (Jira)" <ji...@apache.org> on 2021/08/26 06:39:00 UTC

[jira] [Created] (CARBONDATA-4276) writestream fail when csv is copied to readstream hdfs path

PRIYESH RANJAN created CARBONDATA-4276:
------------------------------------------

             Summary: writestream fail when csv is copied to readstream hdfs path
                 Key: CARBONDATA-4276
                 URL: https://issues.apache.org/jira/browse/CARBONDATA-4276
             Project: CarbonData
          Issue Type: Bug
          Components: data-load
    Affects Versions: 2.2.0
         Environment: Spark 2.4.5
            Reporter: PRIYESH RANJAN


*steps :*

*+In hdfs execute following command :+*

 cd /opt/HA/C10/install/hadoop/datanode/bin/
./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data
./hdfs dfs -mkdir -p /tmp/stream_test/\{checkpoint_all_data,bad_records_all_data}
./hdfs dfs -mkdir -p /Priyesh/streaming/csv/
./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/

./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv /Priyesh/streaming/csv/100_olap_C21.csv

 

*+From Spark-beeline /Spark-sql /Spark-shell, execute :+*

DROP TABLE IF EXISTS all_datatypes_2048;
create table all_datatypes_2048 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) stored as carbondata TBLPROPERTIES('table_blocksize'='2048','streaming'='true', 'sort_columns'='imei');

 

*+From Spark-shell ,execute :+*

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv")

df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start

show segments for table all_datatypes_2048;

 

*issue 1 :*

*+when  copy csv file in hdfs folder for 1st time after streaming started ,writestream fails with error:+*

scala> df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start
21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid. Using the default value "true"
21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for key carbon.lock.type is invalid for current file system. Use the default value HDFSLOCK instead.
21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist
21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8

scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925)
 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988)
 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156)
 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
21/08/26 13:00:49 ERROR CarbonUtil: Error while closing stream:java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

*issue 2 :*

*+when  copy csv file in hdfs folder for 2nd time after streaming started ,writestreaming fails with :+*

 

21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream segment: hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
 at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
 at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
 at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
 at org.apache.hadoop.ipc.Client.call(Client.java:1412)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
 at com.sun.proxy.$Proxy17.append(Unknown Source)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy18.append(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
 at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
 at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
 at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
 at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
 at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
 at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
 at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
 at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
21/08/26 13:01:36 ERROR Utils: Aborting task
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
 at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
 at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
 at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
 at org.apache.hadoop.ipc.Client.call(Client.java:1412)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
 at com.sun.proxy.$Proxy17.append(Unknown Source)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy18.append(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
 at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
 at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
 at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
 at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
 at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
 at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
 at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
 at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)