You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/05 01:20:36 UTC

carbondata git commit: [HOTFIX] Throw original exception in thread pool

Repository: carbondata
Updated Branches:
  refs/heads/master 934216de1 -> 469c52f5d


[HOTFIX] Throw original exception in thread pool

If there are exception occurs in the Callable.run in the thread pool, it should throw the original exception instead of throw a new one, which makes it hard for debugging.

This closes #2887


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/469c52f5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/469c52f5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/469c52f5

Branch: refs/heads/master
Commit: 469c52f5d4c18579e2a6ed4c3bb35691cf01937b
Parents: 934216d
Author: Jacky Li <ja...@qq.com>
Authored: Wed Oct 31 20:16:11 2018 +0800
Committer: xuchuanyin <xu...@hust.edu.cn>
Committed: Mon Nov 5 09:15:53 2018 +0800

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  2 +
 .../carbondata/spark/rdd/PartitionDropper.scala |  7 +-
 .../spark/rdd/PartitionSplitter.scala           |  4 +-
 .../steps/DataWriterProcessorStepImpl.java      | 68 ++++++--------------
 4 files changed, 27 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 041dc1c..0b6a2a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -156,6 +156,8 @@ class NewCarbonDataLoadRDD[K, V](
           logInfo("Bad Record Found")
         case e: Exception =>
           loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
+          executionErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+          executionErrors.errorMsg = e.getMessage
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index 6911b0b..353a478 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -102,8 +102,8 @@ object PartitionDropper {
               Seq(partitionId, targetPartitionId).toList, dbName,
               tableName, partitionInfo)
           } catch {
-            case e: IOException => sys.error(s"Exception while delete original carbon files " +
-                                             e.getMessage)
+            case e: IOException =>
+              throw new IOException("Exception while delete original carbon files ", e)
           }
           Audit.log(logger, s"Drop Partition request completed for table " +
                        s"${ dbName }.${ tableName }")
@@ -111,7 +111,8 @@ object PartitionDropper {
                       s"${ dbName }.${ tableName }")
         }
       } catch {
-        case e: Exception => sys.error(s"Exception in dropping partition action: ${ e.getMessage }")
+        case e: Exception =>
+          throw new RuntimeException("Exception in dropping partition action", e)
       }
     } else {
       PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index ca9f049..369ad51 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -87,8 +87,8 @@ object PartitionSplitter {
            deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier,
              Seq(partitionId).toList, databaseName, tableName, partitionInfo)
        } catch {
-         case e: IOException => sys.error(s"Exception while delete original carbon files " +
-         e.getMessage)
+         case e: IOException =>
+           throw new IOException("Exception while delete original carbon files ", e)
        }
        Audit.log(logger, s"Add/Split Partition request completed for table " +
                     s"${ databaseName }.${ tableName }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 1657476..2dc3275 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -122,7 +121,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
       // do this concurrently
       for (Iterator<CarbonRowBatch> iterator : iterators) {
         rangeExecutorServiceSubmitList.add(
-            rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i)));
+            rangeExecutorService.submit(new WriterForwarder(iterator, i)));
         i++;
       }
       try {
@@ -131,15 +130,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) {
           rangeExecutorServiceSubmitList.get(j).get();
         }
-      } catch (InterruptedException | ExecutionException e) {
+      } catch (InterruptedException e) {
         throw new CarbonDataWriterException(e);
+      } catch (ExecutionException e) {
+        throw new CarbonDataWriterException(e.getCause());
       }
     } catch (CarbonDataWriterException e) {
-      LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
-      throw new CarbonDataLoadingException(
-          "Error while initializing data handler : " + e.getMessage());
+      throw new CarbonDataLoadingException("Error while initializing writer: " + e.getMessage(), e);
     } catch (Exception e) {
-      LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
       throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
     }
     return null;
@@ -154,19 +152,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
    */
   private final class WriterForwarder implements Callable<Void> {
     private Iterator<CarbonRowBatch> insideRangeIterator;
-    private CarbonTableIdentifier tableIdentifier;
     private int rangeId;
 
-    public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator,
-        CarbonTableIdentifier tableIdentifier, int rangeId) {
+    WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) {
       this.insideRangeIterator = insideRangeIterator;
-      this.tableIdentifier = tableIdentifier;
       this.rangeId = rangeId;
     }
 
-    @Override public Void call() throws Exception {
-      LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
-          + ", range: " + rangeId);
+    @Override public Void call() {
       processRange(insideRangeIterator, rangeId);
       return null;
     }
@@ -184,8 +177,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     while (insideRangeIterator.hasNext()) {
       if (rowsNotExist) {
         rowsNotExist = false;
-        dataHandler = CarbonFactHandlerFactory
-            .createCarbonFactHandler(model);
+        dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
         carbonFactHandlers.add(dataHandler);
         dataHandler.initialise();
       }
@@ -201,12 +193,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     CarbonTableIdentifier tableIdentifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     String tableName = tableIdentifier.getTableName();
-
-    try {
-      dataHandler.finish();
-    } catch (Exception e) {
-      LOGGER.error("Failed for table: " + tableName + " in  finishing data handler", e);
-    }
+    dataHandler.finish();
     LOGGER.info("Record Processed For table: " + tableName);
     String logMessage =
         "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
@@ -222,41 +209,24 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
             System.currentTimeMillis());
   }
 
-  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
+  private void processingComplete(CarbonFactHandler dataHandler) {
     if (null != dataHandler) {
-      try {
-        dataHandler.closeHandler();
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error(e.getMessage(), e);
-        throw new CarbonDataLoadingException(e.getMessage(), e);
-      } catch (Exception e) {
-        LOGGER.error(e.getMessage(), e);
-        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
-      }
+      dataHandler.closeHandler();
     }
   }
 
-  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
-      throws CarbonDataLoadingException {
-    try {
-      while (batch.hasNext()) {
-        CarbonRow row = batch.next();
-        dataHandler.addDataToStore(row);
-        readCounter++;
-      }
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) {
+    while (batch.hasNext()) {
+      CarbonRow row = batch.next();
+      dataHandler.addDataToStore(row);
+      readCounter++;
     }
     rowCounter.getAndAdd(batch.getSize());
   }
 
-  public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
-    try {
-      readCounter++;
-      dataHandler.addDataToStore(row);
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
+  public void processRow(CarbonRow row, CarbonFactHandler dataHandler) {
+    readCounter++;
+    dataHandler.addDataToStore(row);
     rowCounter.getAndAdd(1);
   }