You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/10/01 09:11:48 UTC

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3965: [CARBONDATA-4016] NPE and FileNotFound in Show Segments and Insert Stage

Indhumathi27 commented on a change in pull request #3965:
URL: https://github.com/apache/carbondata/pull/3965#discussion_r498084784



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -96,20 +100,31 @@ object CarbonStore {
    * Read stage files and return input files
    */
   def readStageInput(
+      tableStagePath: String,
       stageFiles: Seq[CarbonFile],
       status: StageInput.StageStatus): Seq[StageInput] = {
     val gson = new Gson()
     val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
     stageFiles.map { stage =>

Review comment:
       Can use foreach instead of map

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -96,20 +100,31 @@ object CarbonStore {
    * Read stage files and return input files
    */
   def readStageInput(
+      tableStagePath: String,
       stageFiles: Seq[CarbonFile],
       status: StageInput.StageStatus): Seq[StageInput] = {
     val gson = new Gson()
     val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
     stageFiles.map { stage =>
-      val filePath = stage.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
+      val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + stage.getName
+      var stream: DataInputStream = null
       try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
-        stageInput.setCreateTime(stage.getLastModifiedTime)
-        stageInput.setStatus(status)
-        output.add(stageInput)
+        stream = FileFactory.getDataInputStream(filePath)
+        var retry = READ_FILE_RETRY_TIMES
+        breakable { while (retry > 0) { try {
+          val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
+          stageInput.setCreateTime(stage.getLastModifiedTime)
+          stageInput.setStatus(status)
+          output.add(stageInput)
+        } catch {
+          case _ : FileNotFoundException => breakable()

Review comment:
       should add log if file is not found?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -96,20 +100,31 @@ object CarbonStore {
    * Read stage files and return input files
    */
   def readStageInput(
+      tableStagePath: String,
       stageFiles: Seq[CarbonFile],
       status: StageInput.StageStatus): Seq[StageInput] = {
     val gson = new Gson()
     val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
     stageFiles.map { stage =>
-      val filePath = stage.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
+      val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + stage.getName
+      var stream: DataInputStream = null
       try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
-        stageInput.setCreateTime(stage.getLastModifiedTime)
-        stageInput.setStatus(status)
-        output.add(stageInput)
+        stream = FileFactory.getDataInputStream(filePath)
+        var retry = READ_FILE_RETRY_TIMES
+        breakable { while (retry > 0) { try {
+          val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
+          stageInput.setCreateTime(stage.getLastModifiedTime)
+          stageInput.setStatus(status)
+          output.add(stageInput)

Review comment:
       should break from the loop, once the stage file is found

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##########
@@ -477,13 +479,23 @@ case class CarbonInsertFromStageCommand(
     stageFiles.map { stage =>
       executorService.submit(new Runnable {
         override def run(): Unit = {
-          val filePath = stage._1.getAbsolutePath
-          val stream = FileFactory.getDataInputStream(filePath)
+          val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + stage._1.getName
+          var stream: DataInputStream = null
           try {
-            val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
-            output.add(stageInput)
+            stream = FileFactory.getDataInputStream(filePath)
+            var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
+            breakable (while (retry > 0) try {

Review comment:
       please format the code

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -96,20 +100,31 @@ object CarbonStore {
    * Read stage files and return input files
    */
   def readStageInput(
+      tableStagePath: String,
       stageFiles: Seq[CarbonFile],
       status: StageInput.StageStatus): Seq[StageInput] = {
     val gson = new Gson()
     val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
     stageFiles.map { stage =>
-      val filePath = stage.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
+      val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + stage.getName
+      var stream: DataInputStream = null
       try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
-        stageInput.setCreateTime(stage.getLastModifiedTime)
-        stageInput.setStatus(status)
-        output.add(stageInput)
+        stream = FileFactory.getDataInputStream(filePath)
+        var retry = READ_FILE_RETRY_TIMES
+        breakable { while (retry > 0) { try {

Review comment:
       PLease format the code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org