You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:53 UTC

[28/50] [abbrv] carbondata git commit: [CARBONDATA-2520] Clean and close datamap writers on any task failure during load

[CARBONDATA-2520] Clean and close datamap writers on any task failure during load

Problem: The datamap writers registered to listener are closed or finished only in case of load success case and not in any failure case. So when tesing lucene, it is found that, after task is failed and the writer is not closed, so the write.lock file written in the index folder of lucene is still exists, so when next task comes to write index in same directory, it fails with the error lock file already exists.

Solution: close the writers if any load task fails.

This closes #2321


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f4bd3d0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f4bd3d0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f4bd3d0

Branch: refs/heads/spark-2.3
Commit: 7f4bd3d06517b551aa14a1054dffa9ae7ca9ad57
Parents: 1b6ce8c
Author: akashrn5 <ak...@gmail.com>
Authored: Thu May 17 11:37:22 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Mon May 28 21:27:25 2018 +0800

----------------------------------------------------------------------
 .../core/datamap/dev/DataMapWriter.java         | 10 ++++++++
 .../datamap/bloom/BloomDataMapWriter.java       |  9 +++++---
 .../datamap/lucene/LuceneDataMapWriter.java     | 12 ++++++----
 .../loading/AbstractDataLoadProcessorStep.java  | 24 ++++++++++++++++++++
 .../CarbonRowDataWriterProcessorStepImpl.java   |  4 +++-
 .../steps/DataWriterBatchProcessorStepImpl.java |  4 +++-
 .../steps/DataWriterProcessorStepImpl.java      | 23 +++++++++++++++++--
 .../store/CarbonFactDataHandlerModel.java       | 22 ++++++++++--------
 8 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 03a369a..89d5d76 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -46,6 +46,8 @@ public abstract class DataMapWriter {
 
   private List<CarbonColumn> indexColumns;
 
+  private boolean isWritingFinished;
+
   public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
       Segment segment, String shardName) {
     this.tablePath = tablePath;
@@ -133,4 +135,12 @@ public abstract class DataMapWriter {
       String tablePath, String segmentId, String dataMapName) {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
   }
+
+  public boolean isWritingFinished() {
+    return isWritingFinished;
+  }
+
+  public void setWritingFinished(boolean writingFinished) {
+    isWritingFinished = writingFinished;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index b3e69f4..2791a6c 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -196,10 +196,13 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   @Override
   public void finish() throws IOException {
-    if (indexBloomFilters.size() > 0) {
-      writeBloomDataMapFile();
+    if (!isWritingFinished()) {
+      if (indexBloomFilters.size() > 0) {
+        writeBloomDataMapFile();
+      }
+      releaseResouce();
+      setWritingFinished(true);
     }
-    releaseResouce();
   }
 
   protected void releaseResouce() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index c7eb3d8..605ec89 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -447,10 +447,14 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * class.
    */
   public void finish() throws IOException {
-    flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
-    // finished a file , close this index writer
-    if (indexWriter != null) {
-      indexWriter.close();
+    if (!isWritingFinished()) {
+      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
+      // finished a file , close this index writer
+      if (indexWriter != null) {
+        indexWriter.close();
+        indexWriter = null;
+      }
+      setWritingFinished(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index 9f2482b..eb02ede 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -25,8 +25,12 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 
 /**
  * This base abstract class for data loading.
@@ -149,6 +153,26 @@ public abstract class AbstractDataLoadProcessorStep {
    */
   protected abstract String getStepName();
 
+  /**
+   * This method registers all writer listeners and returns the listener
+   * @param bucketId bucketId
+   * @return
+   */
+  protected DataMapWriterListener getDataMapWriterListener(int bucketId) {
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
+            (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(
+        configuration.getTableSpec().getCarbonTable(),
+        configuration.getSegmentId(),
+        CarbonTablePath.getShardName(
+            carbonDataFileAttributes.getTaskId(),
+            bucketId,
+            0,
+            String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+    return listener;
+  }
 
   /**
    * Close all resources.This method is called after execute() is finished.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index edf67a7..e465471 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -156,8 +157,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
     String[] storeLocation = getStoreLocation(tableIdentifier);
+    DataMapWriterListener listener = getDataMapWriterListener(0);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
-        configuration, storeLocation, 0, iteratorIndex);
+        configuration, storeLocation, 0, iteratorIndex, listener);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 369c1f2..78777ce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -85,8 +86,9 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
           CarbonRowBatch next = iterator.next();
           // If no rows from merge sorter, then don't create a file in fact column handler
           if (next.hasNext()) {
+            DataMapWriterListener listener = getDataMapWriterListener(0);
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-                .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++);
+                .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++, listener);
             CarbonFactHandler dataHandler = CarbonFactHandlerFactory
                 .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
             dataHandler.initialise();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index b09fb7d..a0f29fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -57,6 +58,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private long readCounter;
 
+  private DataMapWriterListener listener;
+
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
@@ -88,8 +91,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     CarbonTableIdentifier tableIdentifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     String[] storeLocation = getStoreLocation(tableIdentifier);
+    listener = getDataMapWriterListener(0);
     return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration,
-        storeLocation, 0, 0);
+        storeLocation, 0, 0, listener);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -162,8 +166,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
       CarbonTableIdentifier tableIdentifier, int rangeId) {
     String[] storeLocation = getStoreLocation(tableIdentifier);
 
+    listener = getDataMapWriterListener(rangeId);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-        .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0);
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0, listener);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (insideRangeIterator.hasNext()) {
@@ -247,4 +252,18 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return null;
   }
 
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      if (listener != null) {
+        try {
+          LOGGER.info("closing all the DataMap writers registered to DataMap writer listener");
+          listener.finish();
+        } catch (IOException e) {
+          LOGGER.error(e, "error while closing the datamap writers");
+          // ignoring the exception
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f4bd3d0/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index a725936..87a6de0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -168,7 +168,7 @@ public class CarbonFactDataHandlerModel {
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
       CarbonDataLoadConfiguration configuration, String[] storeLocation, int bucketId,
-      int taskExtension) {
+      int taskExtension, DataMapWriterListener listener) {
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     boolean[] isUseInvertedIndex =
@@ -258,15 +258,17 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
-    DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(
-        configuration.getTableSpec().getCarbonTable(),
-        configuration.getSegmentId(),
-        CarbonTablePath.getShardName(
-            carbonDataFileAttributes.getTaskId(),
-            bucketId,
-            0,
-            String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+    if (listener == null) {
+      listener = new DataMapWriterListener();
+      listener.registerAllWriter(
+          configuration.getTableSpec().getCarbonTable(),
+          configuration.getSegmentId(),
+          CarbonTablePath.getShardName(
+              carbonDataFileAttributes.getTaskId(),
+              bucketId,
+              0,
+              String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+    }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();