You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/27 04:13:55 UTC

carbondata git commit: [CARBONDATA-1934] Incorrect results are returned by select query in case when the number of blocklets for one part file are > 1 in the same task

Repository: carbondata
Updated Branches:
  refs/heads/master 525920c25 -> f635106a7


[CARBONDATA-1934] Incorrect results are returned by select query in case when the number of blocklets for one part file are > 1 in the same task

Problem: When a select query is triggered, driver will prune the segments and give a list of blocklets that need to be scanned. The number of tasks from spark will be equal to the number of blocklets identified.
In case where one task has more than one blocklet for same file, then BlockExecution getting formed is incorrect. Due to this the query results are incorrect.

Fix: Use the abstract index to fill all the details in BlockExecutionInfo

This closes #1715


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f635106a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f635106a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f635106a

Branch: refs/heads/master
Commit: f635106a7c8af4a4ae2f1899d4a26ccbd43958d1
Parents: 525920c
Author: manishgupta88 <to...@gmail.com>
Authored: Fri Dec 22 16:05:31 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Dec 27 09:43:23 2017 +0530

----------------------------------------------------------------------
 .../BlockletDataRefNodeWrapper.java              |  4 ++++
 .../executor/impl/AbstractQueryExecutor.java     | 14 ++++++++------
 .../hadoop/ft/CarbonInputMapperTest.java         | 19 ++++++++++++-------
 .../hadoop/test/util/StoreCreator.java           |  8 ++++++++
 4 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f635106a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
index 42505ad..b1331bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -160,4 +160,8 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
   public int numberOfNodes() {
     return blockInfos.size();
   }
+
+  public List<TableBlockInfo> getBlockInfos() {
+    return blockInfos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f635106a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index cb193e4..03bc50f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
 import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -217,12 +218,13 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // query
     // and query will be executed based on that infos
     for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
-      blockExecutionInfoList.add(
-          getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i),
-              queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(),
-              queryModel.getTableBlockInfos().get(i).getBlockletInfos().getNumberOfBlockletToScan(),
-              queryModel.getTableBlockInfos().get(i).getFilePath(),
-              queryModel.getTableBlockInfos().get(i).getDeletedDeltaFilePath()));
+      AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
+      BlockletDataRefNodeWrapper dataRefNode =
+          (BlockletDataRefNodeWrapper) abstractIndex.getDataRefNode();
+      blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, abstractIndex,
+          dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(),
+          dataRefNode.numberOfNodes(), dataRefNode.getBlockInfos().get(0).getFilePath(),
+          dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f635106a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index ff17ca3..a19098a 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -28,12 +28,12 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.test.util.StoreCreator;
 
 import junit.framework.TestCase;
@@ -56,7 +56,6 @@ public class CarbonInputMapperTest extends TestCase {
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
-
   }
 
   @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception {
@@ -77,6 +76,8 @@ public class CarbonInputMapperTest extends TestCase {
       e.printStackTrace();
       Assert.assertTrue("failed", false);
       throw e;
+    } finally {
+      StoreCreator.clearDataMaps();
     }
   }
 
@@ -94,6 +95,8 @@ public class CarbonInputMapperTest extends TestCase {
     } catch (Exception e) {
       e.printStackTrace();
       Assert.assertTrue("failed", false);
+    } finally {
+      StoreCreator.clearDataMaps();
     }
   }
 
@@ -112,6 +115,8 @@ public class CarbonInputMapperTest extends TestCase {
       Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
     } catch (Exception e) {
       Assert.assertTrue("failed", false);
+    } finally {
+      StoreCreator.clearDataMaps();
     }
   }
 
@@ -194,18 +199,18 @@ public class CarbonInputMapperTest extends TestCase {
     job.setOutputValueClass(IntWritable.class);
     job.setMapperClass(Map.class);
     //    job.setReducerClass(WordCountReducer.class);
-    job.setInputFormatClass(CarbonInputFormat.class);
+    job.setInputFormatClass(CarbonTableInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);
     AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
     if (projection != null) {
-      CarbonInputFormat.setColumnProjection(job.getConfiguration(), projection);
+      CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection);
     }
     if (filter != null) {
-      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+      CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), filter);
     }
-    CarbonInputFormat.setDatabaseName(job.getConfiguration(),
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
         abs.getCarbonTableIdentifier().getDatabaseName());
-    CarbonInputFormat.setTableName(job.getConfiguration(),
+    CarbonTableInputFormat.setTableName(job.getConfiguration(),
         abs.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(abs.getTablePath()));
     CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1"));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f635106a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index d3fd087..fc54238 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
@@ -173,6 +174,13 @@ public class StoreCreator {
     }
   }
 
+  /**
+   * Method to clear the data maps
+   */
+  public static void clearDataMaps() {
+    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
+  }
+
   public static CarbonLoadModel getCarbonLoadModel() throws Exception {
     String factFilePath =
         new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();