You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:12 UTC
[11/50] [abbrv] carbondata git commit: [CARBONDATA-2557]
[CARBONDATA-2472] [CARBONDATA-2570] Improve Carbon Reader performance on S3
and fixed datamap clear issue in reader
[CARBONDATA-2557] [CARBONDATA-2472] [CARBONDATA-2570] Improve Carbon Reader performance on S3 and fixed datamap clear issue in reader
[CARBONDATA-2557] [CARBONDATA-2472] Problem : CarbonReaderBuilder.build() is slower in s3. It takes around 8 seconds to finish build()
Solution: S3 is slow in listFiles, open, FileExist, getCarbonFile operations. So, List down all the calls of those API in the reader flow and remove the redundant checks.
[CARBONDATA-2570] Problem : Carbon SDK Reader, second time reader instance have an issue in cluster test
Solution: Blocklet datamap's of first time reader is not cleared properly in the cluster. Need to change the API to clear the blocklet datamap.
so change
DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
to
DataMapStoreManager.getInstance().clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifie());
This closes #2345
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f68a792
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f68a792
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f68a792
Branch: refs/heads/carbonstore
Commit: 5f68a792f2e83d15379740f715cf05d7ae9aaa05
Parents: 2f23486
Author: ajantha-bhat <aj...@gmail.com>
Authored: Sun May 27 22:49:23 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 5 16:23:27 2018 +0530
----------------------------------------------------------------------
.../core/datamap/dev/CacheableDataMap.java | 6 +-
.../core/datastore/SegmentTaskIndexStore.java | 2 +-
.../indexstore/BlockletDataMapIndexStore.java | 84 +++++++------
.../TableBlockIndexUniqueIdentifierWrapper.java | 52 ++++++++
.../blockletindex/BlockletDataMapFactory.java | 122 ++++++++-----------
.../blockletindex/SegmentIndexFileStore.java | 15 +++
.../core/metadata/schema/table/CarbonTable.java | 60 ++++-----
.../LatestFilesReadCommittedScope.java | 19 +--
.../SegmentUpdateStatusManager.java | 15 ++-
.../core/util/BlockletDataMapUtil.java | 50 +++++++-
.../apache/carbondata/core/util/CarbonUtil.java | 30 +++++
.../TestBlockletDataMapFactory.java | 13 +-
docs/sdk-guide.md | 10 --
.../examples/sdk/CarbonReaderExample.java | 1 -
.../carbondata/hadoop/CarbonRecordReader.java | 3 +-
.../hadoop/api/CarbonFileInputFormat.java | 97 ++++-----------
.../hadoop/api/CarbonInputFormat.java | 24 ++++
...FileInputFormatWithExternalCarbonTable.scala | 2 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 2 +-
.../TestNonTransactionalCarbonTable.scala | 11 +-
.../sdk/file/CarbonReaderBuilder.java | 51 ++------
.../carbondata/sdk/file/CarbonReaderTest.java | 4 +-
22 files changed, 375 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
index dba0840..e292c60 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
-import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.memory.MemoryException;
/**
@@ -33,10 +33,10 @@ public interface CacheableDataMap {
/**
* Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
*
- * @param tableBlockIndexUniqueIdentifier
+ * @param tableBlockIndexUniqueIdentifierWrapper
* @param blockletDataMapIndexWrapper
*/
- void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index d325f21..c642091 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -91,7 +91,7 @@ public class SegmentTaskIndexStore
segmentTaskIndexWrapper =
loadAndGetTaskIdToSegmentsMap(
tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
- CarbonTable.buildFromTablePath("name", "path", false),
+ CarbonTable.buildDummyTable("path"),
tableSegmentUniqueIdentifier);
} catch (IndexBuilderException e) {
throw new IOException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index db49976..71a9b5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil;
* blocks
*/
public class BlockletDataMapIndexStore
- implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
+ implements Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
/**
@@ -68,8 +68,10 @@ public class BlockletDataMapIndexStore
}
@Override
- public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
+ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper)
throws IOException {
+ TableBlockIndexUniqueIdentifier identifier =
+ identifierWrapper.getTableBlockIndexUniqueIdentifier();
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
(BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
@@ -84,7 +86,7 @@ public class BlockletDataMapIndexStore
// if the identifier is not a merge file we can directly load the datamaps
if (identifier.getMergeIndexFileName() == null) {
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
- .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+ .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
@@ -96,9 +98,10 @@ public class BlockletDataMapIndexStore
BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
tableBlockIndexUniqueIdentifiers) {
- Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
- .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
- carbonDataFileBlockMetaInfoMapping);
+ Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil.getBlockMetaInfoMap(
+ new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
+ identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
+ carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
dataMaps.add(blockletDataMap);
@@ -119,26 +122,28 @@ public class BlockletDataMapIndexStore
return blockletDataMapIndexWrapper;
}
- @Override
- public List<BlockletDataMapIndexWrapper> getAll(
- List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+ @Override public List<BlockletDataMapIndexWrapper> getAll(
+ List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
+ throws IOException {
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
- List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
+ List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = new ArrayList<>();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
// Get the datamaps for each indexfile from cache.
try {
- for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
- BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+ for (TableBlockIndexUniqueIdentifierWrapper
+ identifierWrapper : tableSegmentUniqueIdentifiers) {
+ BlockletDataMapIndexWrapper dataMapIndexWrapper =
+ getIfPresent(identifierWrapper);
if (dataMapIndexWrapper != null) {
blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
} else {
- missedIdentifiers.add(identifier);
+ missedIdentifiersWrapper.add(identifierWrapper);
}
}
- if (missedIdentifiers.size() > 0) {
- for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
- blockletDataMapIndexWrapper = get(identifier);
+ if (missedIdentifiersWrapper.size() > 0) {
+ for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
+ blockletDataMapIndexWrapper = get(identifierWrapper);
blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
}
}
@@ -151,37 +156,40 @@ public class BlockletDataMapIndexStore
}
throw new IOException("Problem in loading segment blocks.", e);
}
+
return blockletDataMapIndexWrappers;
}
/**
* returns the SegmentTaskIndexWrapper
*
- * @param tableSegmentUniqueIdentifier
+ * @param tableSegmentUniqueIdentifierWrapper
* @return
*/
- @Override
- public BlockletDataMapIndexWrapper getIfPresent(
- TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ @Override public BlockletDataMapIndexWrapper getIfPresent(
+ TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
return (BlockletDataMapIndexWrapper) lruCache.get(
- tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
+ .getUniqueTableSegmentIdentifier());
}
/**
* method invalidate the segment cache for segment
*
- * @param tableSegmentUniqueIdentifier
+ * @param tableSegmentUniqueIdentifierWrapper
*/
- @Override
- public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
- lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ @Override public void invalidate(
+ TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
+ lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
+ .getUniqueTableSegmentIdentifier());
}
@Override
- public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
- tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
+ .getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
@@ -190,16 +198,16 @@ public class BlockletDataMapIndexStore
// as in that case clearing unsafe memory need to be taken card. If at all datamap entry
// in the cache need to be overwritten then use the invalidate interface
// and then use the put interface
- if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
synchronized (lock) {
- if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
try {
for (BlockletDataMap blockletDataMap: dataMaps) {
blockletDataMap.convertToUnsafeDMStore();
}
- lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
- wrapper.getMemorySize());
+ lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
+ .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize());
} catch (Throwable e) {
// clear all the memory acquired by data map in case of any failure
for (DataMap blockletDataMap : dataMaps) {
@@ -264,14 +272,14 @@ public class BlockletDataMapIndexStore
/**
* The method clears the access count of table segments
*
- * @param tableSegmentUniqueIdentifiers
+ * @param tableSegmentUniqueIdentifiersWrapper
*/
- @Override
- public void clearAccessCount(
- List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
- for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
- BlockletDataMap cacheable =
- (BlockletDataMap) lruCache.get(identifier.getUniqueTableSegmentIdentifier());
+ @Override public void clearAccessCount(
+ List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) {
+ for (TableBlockIndexUniqueIdentifierWrapper
+ identifierWrapper : tableSegmentUniqueIdentifiersWrapper) {
+ BlockletDataMap cacheable = (BlockletDataMap) lruCache.get(
+ identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier());
cacheable.clear();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
new file mode 100644
index 0000000..3411397
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+
+/**
+ * Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
+ * This is just a wrapper passed between methods like a context, This object must never be cached.
+ *
+ */
+public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // holds the reference to tableBlockIndexUniqueIdentifier
+ private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+ // holds the reference to CarbonTable
+ private CarbonTable carbonTable;
+
+ public TableBlockIndexUniqueIdentifierWrapper(
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
+ this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
+ this.carbonTable = carbonTable;
+ }
+
+ public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+ return tableBlockIndexUniqueIdentifier;
+ }
+
+ public CarbonTable getCarbonTable() {
+ return carbonTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 318fc6e..c434e2e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -44,16 +44,12 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -81,7 +77,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
// segmentId -> list of index file
private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new ConcurrentHashMap<>();
- private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
+ private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache;
public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
super(carbonTable, dataMapSchema);
@@ -104,11 +100,15 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
new ArrayList<>(identifiers.size());
- tableBlockIndexUniqueIdentifiers.addAll(identifiers);
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
+ tableBlockIndexUniqueIdentifierWrappers.add(
+ new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+ this.getCarbonTable()));
+ }
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
- cache.getAll(tableBlockIndexUniqueIdentifiers);
+ cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
dataMaps.addAll(wrapper.getDataMaps());
}
@@ -120,12 +120,6 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
- CarbonTable carbonTable = this.getCarbonTable();
- if (!carbonTable.getTableInfo().isTransactionalTable()) {
- // For NonTransactional table, compare the schema of all index files with inferred schema.
- // If there is a mismatch throw exception. As all files must be of same schema.
- validateSchemaForNewTranscationalTableFiles(segment, carbonTable);
- }
tableBlockIndexUniqueIdentifiers =
BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
@@ -133,46 +127,6 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return tableBlockIndexUniqueIdentifiers;
}
- private void validateSchemaForNewTranscationalTableFiles(Segment segment, CarbonTable carbonTable)
- throws IOException {
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- Map<String, String> indexFiles = segment.getCommittedIndexFile();
- for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
- Path indexFile = new Path(indexFileEntry.getKey());
- org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
- indexFile.toString(), carbonTable.getTableName());
- TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
- tableInfo, identifier.getDatabaseName(),
- identifier.getTableName(),
- identifier.getTablePath());
- List<ColumnSchema> indexFileColumnList =
- wrapperTableInfo.getFactTable().getListOfColumns();
- List<ColumnSchema> tableColumnList =
- carbonTable.getTableInfo().getFactTable().getListOfColumns();
- if (!isSameColumnSchemaList(indexFileColumnList, tableColumnList)) {
- LOG.error("Schema of " + indexFile.getName()
- + " doesn't match with the table's schema");
- throw new IOException("All the files doesn't have same schema. "
- + "Unsupported operation on nonTransactional table. Check logs.");
- }
- }
- }
-
- private boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
- List<ColumnSchema> tableColumnList) {
- if (indexFileColumnList.size() != tableColumnList.size()) {
- LOG.error("Index file's column size is " + indexFileColumnList.size()
- + " but table's column size is " + tableColumnList.size());
- return false;
- }
- for (int i = 0; i < tableColumnList.size(); i++) {
- if (!indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i))) {
- return false;
- }
- }
- return true;
- }
-
/**
* Get the blocklet detail information based on blockletid, blockid and segmentid. This method is
* exclusively for BlockletDataMapFactory as detail information is only available in this
@@ -191,9 +145,16 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
}
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
+ Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
+ new HashSet<>(identifiers.size());
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
+ tableBlockIndexUniqueIdentifierWrappers.add(
+ new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+ this.getCarbonTable()));
+ }
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
- detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+ detailedBlocklets.add(getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet));
}
return detailedBlocklets;
}
@@ -204,14 +165,24 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
- Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
- return getExtendedBlocklet(identifiers, blocklet);
+ Set<TableBlockIndexUniqueIdentifier> identifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+
+ Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
+ new HashSet<>(identifiers.size());
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
+ tableBlockIndexUniqueIdentifierWrappers.add(
+ new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+ this.getCarbonTable()));
+ }
+ return getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet);
}
- private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
- Blocklet blocklet) throws IOException {
- for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+ private ExtendedBlocklet getExtendedBlocklet(
+ Set<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper, Blocklet blocklet)
+ throws IOException {
+ for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : identifiersWrapper) {
+ BlockletDataMapIndexWrapper wrapper = cache.get(identifierWrapper);
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
for (DataMap dataMap : dataMaps) {
if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
@@ -265,12 +236,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
- BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+ TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
+ new TableBlockIndexUniqueIdentifierWrapper(blockIndex, this.getCarbonTable());
+ BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndexWrapper);
if (null != wrapper) {
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
for (DataMap dataMap : dataMaps) {
if (dataMap != null) {
- cache.invalidate(blockIndex);
+ cache.invalidate(blockIndexWrapper);
dataMap.clear();
}
}
@@ -292,27 +265,28 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
- List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
+ List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
Path indexPath = new Path(mapDistributable.getFilePath());
String segmentNo = mapDistributable.getSegment().getSegmentNo();
if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
String parent = indexPath.getParent().toString();
- identifiers
- .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
+ identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
+ new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo),
+ this.getCarbonTable()));
} else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
String parentPath = carbonFile.getParentFile().getAbsolutePath();
List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
for (String indexFile : indexFiles) {
- identifiers.add(
+ identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
- segmentNo));
+ segmentNo), this.getCarbonTable()));
}
}
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
try {
- List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers);
+ List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiersWrapper);
for (BlockletDataMapIndexWrapper wrapper : wrappers) {
dataMaps.addAll(wrapper.getDataMaps());
}
@@ -356,9 +330,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return false;
}
- @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ @Override
+ public void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException {
- cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper);
+ cache.put(tableBlockIndexUniqueIdentifierWrapper, blockletDataMapIndexWrapper);
}
@Override
@@ -373,7 +348,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
.filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
(BlockletDataMapDistributable) distributable);
- if (null == cache.getIfPresent(validIdentifier)) {
+ if (null == cache.getIfPresent(
+ new TableBlockIndexUniqueIdentifierWrapper(validIdentifier, this.getCarbonTable()))) {
((BlockletDataMapDistributable) distributable)
.setTableBlockIndexUniqueIdentifier(validIdentifier);
distributablesToBeLoaded.add(distributable);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index c2686d0..35e512d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -323,6 +323,21 @@ public class SegmentIndexFileStore {
/**
* List all the index files of the segment.
*
+ * @param carbonFile directory
+ * @return
+ */
+ public static CarbonFile[] getCarbonIndexFiles(CarbonFile carbonFile) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
+ }
+ });
+ }
+
+ /**
+ * List all the index files of the segment.
+ *
* @param segmentPath
* @return
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ba051be..6949643 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -218,17 +218,9 @@ public class CarbonTable implements Serializable {
}
}
- public static CarbonTable buildFromTablePath(String tableName, String tablePath,
- boolean isTransactionalTable) throws IOException {
- if (isTransactionalTable) {
- return SchemaReader
- .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, "default", tableName));
- } else {
- // Infer the schema from the Carbondata file.
- TableInfo tableInfoInfer =
- SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "null", "null"), false);
- return CarbonTable.buildFromTableInfo(tableInfoInfer);
- }
+ public static CarbonTable buildDummyTable(String tablePath) throws IOException {
+ TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
+ return CarbonTable.buildFromTableInfo(tableInfoInfer);
}
public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath)
@@ -241,24 +233,7 @@ public class CarbonTable implements Serializable {
*/
public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
CarbonTable table = new CarbonTable();
- updateTableInfo(tableInfo);
- table.tableInfo = tableInfo;
- table.blockSize = tableInfo.getTableBlockSizeInMB();
- table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
- table.tableUniqueName = tableInfo.getTableUniqueName();
- table.setTransactionalTable(tableInfo.isTransactionalTable());
- table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
- table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
- if (tableInfo.getFactTable().getBucketingInfo() != null) {
- table.tableBucketMap.put(tableInfo.getFactTable().getTableName(),
- tableInfo.getFactTable().getBucketingInfo());
- }
- if (tableInfo.getFactTable().getPartitionInfo() != null) {
- table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
- tableInfo.getFactTable().getPartitionInfo());
- }
- table.hasDataMapSchema =
- null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
+ updateTableByTableInfo(table, tableInfo);
return table;
}
@@ -996,4 +971,31 @@ public class CarbonTable implements Serializable {
}
return indexColumn;
}
+
+ /**
+ * update the carbon table by using the passed tableInfo
+ *
+ * @param table
+ * @param tableInfo
+ */
+ public static void updateTableByTableInfo(CarbonTable table, TableInfo tableInfo) {
+ updateTableInfo(tableInfo);
+ table.tableInfo = tableInfo;
+ table.blockSize = tableInfo.getTableBlockSizeInMB();
+ table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
+ table.tableUniqueName = tableInfo.getTableUniqueName();
+ table.setTransactionalTable(tableInfo.isTransactionalTable());
+ table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
+ table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
+ if (tableInfo.getFactTable().getBucketingInfo() != null) {
+ table.tableBucketMap.put(tableInfo.getFactTable().getTableName(),
+ tableInfo.getFactTable().getBucketingInfo());
+ }
+ if (tableInfo.getFactTable().getPartitionInfo() != null) {
+ table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
+ tableInfo.getFactTable().getPartitionInfo());
+ }
+ table.hasDataMapSchema =
+ null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 6a1234e..63cfa21 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -23,7 +23,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.mutate.UpdateVO;
@@ -157,28 +156,20 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
@Override public void takeCarbonIndexFileSnapShot() throws IOException {
// Read the current file Path get the list of indexes from the path.
CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
- CarbonFile[] files = file.listFiles(new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
- .endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName().endsWith("Fact");
- }
- });
- if (files.length == 0) {
- // For nonTransactional table, files can be removed at any point of time.
- // So cannot assume files will be present
- throw new IOException("No files are present in the table location :" + carbonFilePath);
- }
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
- carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
} else {
String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
}
+ if (carbonIndexFiles.length == 0) {
+ throw new IOException(
+ "No Index files are present in the table location :" + carbonFilePath);
+ }
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. If Required to support merge index, then this code has to be modified.
// TODO. Nested File Paths.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 1c53fbb..c2faadc 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -86,10 +86,19 @@ public class SegmentUpdateStatusManager {
this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
- segmentDetails = SegmentStatusManager.readLoadMetadata(
- CarbonTablePath.getMetadataPath(identifier.getTablePath()));
+ if (!table.getTableInfo().isTransactionalTable()) {
+ // fileExist is costly operation, so check based on table Type
+ segmentDetails = new LoadMetadataDetails[0];
+ } else {
+ segmentDetails = SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
+ }
isPartitionTable = table.isHivePartitionTable();
- updateDetails = readLoadMetadata();
+ if (segmentDetails.length != 0) {
+ updateDetails = readLoadMetadata();
+ } else {
+ updateDetails = new SegmentUpdateDetails[0];
+ }
populateMap();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 0d28b9f..518cd03 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -33,20 +33,31 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
public class BlockletDataMapUtil {
+ private static final Log LOG = LogFactory.getLog(BlockletDataMapUtil.class);
+
public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
- TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
- Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
- throws IOException {
+ TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
+ SegmentIndexFileStore indexFileStore, Set<String> filesRead,
+ Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) throws IOException {
+ boolean isTransactionalTable = true;
+ TableBlockIndexUniqueIdentifier identifier =
+ identifierWrapper.getTableBlockIndexUniqueIdentifier();
+ List<ColumnSchema> tableColumnList = null;
if (identifier.getMergeIndexFileName() != null
&& indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
CarbonFile indexMergeFile = FileFactory.getCarbonFile(
@@ -67,7 +78,25 @@ public class BlockletDataMapUtil {
List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+ CarbonTable carbonTable = identifierWrapper.getCarbonTable();
+ if (carbonTable != null) {
+ isTransactionalTable = carbonTable.getTableInfo().isTransactionalTable();
+ tableColumnList =
+ carbonTable.getTableInfo().getFactTable().getListOfColumns();
+ }
for (DataFileFooter footer : indexInfo) {
+ if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
+ !isSameColumnSchemaList(footer.getColumnInTable(), tableColumnList)) {
+ LOG.error("Schema of " + identifier.getIndexFileName()
+ + " doesn't match with the table's schema");
+ throw new IOException("All the files doesn't have same schema. "
+ + "Unsupported operation on nonTransactional table. Check logs.");
+ }
+ if ((tableColumnList != null) && (tableColumnList.size() == 0)) {
+ // Carbon reader have used dummy columnSchema. Update it with inferred schema now
+ carbonTable.getTableInfo().getFactTable().setListOfColumns(footer.getColumnInTable());
+ CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo());
+ }
String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
if (null == blockMetaInfoMap.get(blockPath)) {
blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
@@ -156,6 +185,7 @@ public class BlockletDataMapUtil {
* This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file
*
* @param identifier
+ * @param segmentIndexFileStore
* @return
* @throws IOException
*/
@@ -177,4 +207,18 @@ public class BlockletDataMapUtil {
return tableBlockIndexUniqueIdentifiers;
}
+ private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
+ List<ColumnSchema> tableColumnList) {
+ if (indexFileColumnList.size() != tableColumnList.size()) {
+ LOG.error("Index file's column size is " + indexFileColumnList.size()
+ + " but table's column size is " + tableColumnList.size());
+ return false;
+ }
+ for (int i = 0; i < tableColumnList.size(); i++) {
+ if (!indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5a7bce3..e1e5e16 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -53,6 +53,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
@@ -2371,6 +2372,35 @@ public final class CarbonUtil {
}
/**
+ * This method will prepare dummy tableInfo
+ *
+ * @param carbonDataFilePath
+ * @param tableName
+ * @return
+ */
+ public static TableInfo buildDummyTableInfo(String carbonDataFilePath,
+ String tableName, String dbName) {
+ // During SDK carbon Reader, This method will be called.
+ // This API will avoid IO operation to get the columnSchema list.
+ // ColumnSchema list will be filled during blocklet loading (where actual IO happens)
+ List<ColumnSchema> columnSchemaList = new ArrayList<>();
+ TableSchema tableSchema = getDummyTableSchema(tableName,columnSchemaList);
+ ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+ new ThriftWrapperSchemaConverterImpl();
+ org.apache.carbondata.format.TableSchema thriftFactTable =
+ thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema);
+ org.apache.carbondata.format.TableInfo tableInfo =
+ new org.apache.carbondata.format.TableInfo(thriftFactTable,
+ new ArrayList<org.apache.carbondata.format.TableSchema>());
+ tableInfo.setDataMapSchemas(null);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ tableInfo, dbName, tableName, carbonDataFilePath);
+ wrapperTableInfo.setTransactionalTable(false);
+ return wrapperTableInfo;
+ }
+
+ /**
* This method will infer the schema file from a given index file path
* @param indexFilePath
* @param tableName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index dfbdd29..526f630 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -57,7 +58,9 @@ public class TestBlockletDataMapFactory {
private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
- private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
+ private TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper;
+
+ private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache;
@Before public void setUp()
throws ClassNotFoundException, IllegalAccessException, InvocationTargetException,
@@ -78,6 +81,8 @@ public class TestBlockletDataMapFactory {
tableBlockIndexUniqueIdentifier =
new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
"0_batchno0-0-1521012756709.carbonindex", null, "0");
+ tableBlockIndexUniqueIdentifierWrapper =
+ new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, carbonTable);
cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
}
@@ -86,12 +91,12 @@ public class TestBlockletDataMapFactory {
IllegalAccessException {
List<BlockletDataMap> dataMaps = new ArrayList<>();
Method method = BlockletDataMapFactory.class
- .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+ .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifierWrapper.class,
BlockletDataMapIndexWrapper.class);
method.setAccessible(true);
- method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+ method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper,
new BlockletDataMapIndexWrapper(dataMaps));
- BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+ BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper);
assert null != result;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 5dbb5ac..0f20dc3 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -460,16 +460,6 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
```
/**
- * Project all Columns for carbon reader
- *
- * @return CarbonReaderBuilder object
- * @throws IOException
- */
- public CarbonReaderBuilder projectAllColumns();
-```
-
-```
- /**
* Configure the transactional status of table
* If set to false, then reads the carbondata and carbonindex files from a flat folder structure.
* If set to true, then reads the carbondata and carbonindex files from segment folder structure.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index 8d3ff0d..ada1a8c 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -116,7 +116,6 @@ public class CarbonReaderExample {
// Read data
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
- .projectAllColumns()
.build();
System.out.println("\nData:");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index da84c00..4911e41 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -123,7 +123,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
}
}
// Clear the datamap cache
- DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifier());
// close read support
readSupport.close();
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 8ed89d5..8755176 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -23,26 +23,21 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -105,8 +100,10 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
+
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
@@ -115,6 +112,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
// get all valid segments and set them into the configuration
// check for externalTable segment (Segment_null)
// process and resolve the expression
+
ReadCommittedScope readCommittedScope = null;
if (carbonTable.isTransactionalTable()) {
readCommittedScope = new LatestFilesReadCommittedScope(
@@ -129,44 +127,33 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter);
- String segmentDir = null;
+ // if external table Segments are found, add it to the List
+ List<Segment> externalTableSegments = new ArrayList<Segment>();
+ Segment seg;
if (carbonTable.isTransactionalTable()) {
- segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+ // SDK some cases write into the Segment Path instead of Table Path i.e. inside
+ // the "Fact/Part0/Segment_null". The segment in this case is named as "null".
+ // The table is denoted by default as a transactional table and goes through
+ // the path of CarbonFileInputFormat. The above scenario is handled in the below code.
+ seg = new Segment("null", null, readCommittedScope);
+ externalTableSegments.add(seg);
} else {
- segmentDir = identifier.getTablePath();
- }
- FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
- if (FileFactory.isFileExist(segmentDir, fileType)) {
- // if external table Segments are found, add it to the List
- List<Segment> externalTableSegments = new ArrayList<Segment>();
- Segment seg;
- if (carbonTable.isTransactionalTable()) {
- // SDK some cases write into the Segment Path instead of Table Path i.e. inside
- // the "Fact/Part0/Segment_null". The segment in this case is named as "null".
- // The table is denoted by default as a transactional table and goes through
- // the path of CarbonFileInputFormat. The above scenario is handled in the below code.
- seg = new Segment("null", null, readCommittedScope);
+ LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+ for (LoadMetadataDetails load : loadMetadataDetails) {
+ seg = new Segment(load.getLoadName(), null, readCommittedScope);
externalTableSegments.add(seg);
- } else {
- LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
- for (LoadMetadataDetails load : loadMetadataDetails) {
- seg = new Segment(load.getLoadName(), null, readCommittedScope);
- externalTableSegments.add(seg);
- }
}
-
- Map<String, String> indexFiles =
- new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
-
- if (indexFiles.size() == 0) {
- throw new RuntimeException("Index file not present to read the carbondata file");
- }
- // do block filtering and get split
- List<InputSplit> splits =
- getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
-
- return splits;
}
+ // do block filtering and get split
+ List<InputSplit> splits =
+ getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+ if (getColumnProjection(job.getConfiguration()) == null) {
+ // If the user projection is empty, use default all columns as projections.
+ // All column name will be filled inside getSplits, so can update only here.
+ String[] projectionColumns = projectAllColumns(carbonTable);
+ setColumnProjection(job.getConfiguration(), projectionColumns);
+ }
+ return splits;
}
return null;
}
@@ -185,45 +172,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
numSegments = validSegments.size();
List<InputSplit> result = new LinkedList<InputSplit>();
- UpdateVO invalidBlockVOForSegmentId = null;
- Boolean isIUDTable = false;
-
- SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable);
-
- isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
validSegments, partitionInfo, oldPartitionIdList);
numBlocks = dataBlocksOfSegment.size();
- for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
-
- // Get the UpdateVO for those tables on which IUD operations being performed.
- if (isIUDTable) {
- invalidBlockVOForSegmentId =
- updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
- }
- String[] deleteDeltaFilePath = null;
- if (isIUDTable) {
- // In case IUD is not performed in this table avoid searching for
- // invalidated blocks.
- if (CarbonUtil
- .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
- invalidBlockVOForSegmentId, updateStatusManager)) {
- continue;
- }
- // When iud is done then only get delete delta files for a block
- try {
- deleteDeltaFilePath = updateStatusManager
- .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
- result.add(inputSplit);
- }
+ result.addAll(dataBlocksOfSegment);
return result;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 05c70f8..485b087 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -675,4 +676,27 @@ m filterExpression
return false;
}
}
+
+ /**
+ * Project all Columns for carbon reader
+ *
+ * @return String araay of columnNames
+ * @param carbonTable
+ */
+ public String[] projectAllColumns(CarbonTable carbonTable) {
+ List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
+ List<String> projectColumn = new ArrayList<>();
+ for (ColumnSchema cols : colList) {
+ if (cols.getSchemaOrdinal() != -1) {
+ projectColumn.add(cols.getColumnUniqueId());
+ }
+ }
+ String[] projectionColumns = new String[projectColumn.size()];
+ int i = 0;
+ for (String columnName : projectColumn) {
+ projectionColumns[i] = columnName;
+ i++;
+ }
+ return projectionColumns;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index e6d39d3..0e6f0c7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -184,7 +184,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
{
sql("select * from sdkOutputTable").show(false)
}
- assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+ assert(exception.getMessage().contains("Error while taking index snapshot"))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index 211bc8c..d7e500e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -346,7 +346,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
{
sql("select * from sdkOutputTable").show(false)
}
- assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+ assert(exception.getMessage().contains("Error while taking index snapshot"))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 095d12d..14a63ca 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -993,7 +993,14 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage()
- .contains("All the files doesn't have same schema"))
+ .contains("Problem in loading segment blocks."))
+
+ val exception1 =
+ intercept[IOException] {
+ sql("select count(*) from sdkOutputTable").show(false)
+ }
+ assert(exception1.getMessage()
+ .contains("Problem in loading segment blocks."))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
@@ -1025,7 +1032,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage()
- .contains("All the files doesn't have same schema"))
+ .contains("Problem in loading segment blocks."))
sql("DROP TABLE sdkOutputTable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 9d7470e..98aa6e0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
@@ -51,12 +50,6 @@ public class CarbonReaderBuilder {
private boolean isTransactionalTable;
/**
- * It will be true if use the projectAllColumns method,
- * it will be false if use the projection method
- */
- private boolean isProjectAllColumns = true;
-
- /**
* Construct a CarbonReaderBuilder with table path and table name
*
* @param tablePath table path
@@ -76,7 +69,6 @@ public class CarbonReaderBuilder {
public CarbonReaderBuilder projection(String[] projectionColumnNames) {
Objects.requireNonNull(projectionColumnNames);
this.projectionColumns = projectionColumnNames;
- isProjectAllColumns = false;
return this;
}
@@ -96,33 +88,6 @@ public class CarbonReaderBuilder {
}
/**
- * Project all Columns for carbon reader
- *
- * @return CarbonReaderBuilder object
- * @throws IOException
- */
- public CarbonReaderBuilder projectAllColumns() throws IOException {
- CarbonTable carbonTable = CarbonTable
- .buildFromTablePath(tableName, tablePath, isTransactionalTable);
-
- List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
- List<String> projectColumn = new ArrayList<String>();
- for (ColumnSchema cols : colList) {
- if (cols.getSchemaOrdinal() != -1) {
- projectColumn.add(cols.getColumnUniqueId());
- }
- }
- projectionColumns = new String[projectColumn.size()];
- int i = 0;
- for (String columnName : projectColumn) {
- projectionColumns[i] = columnName;
- i++;
- }
- isProjectAllColumns = true;
- return this;
- }
-
- /**
* Configure the filter expression for carbon reader
*
* @param filterExpression filter expression
@@ -209,8 +174,13 @@ public class CarbonReaderBuilder {
* @throws InterruptedException
*/
public <T> CarbonReader<T> build() throws IOException, InterruptedException {
- CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath, isTransactionalTable);
-
+ // DB name is not applicable for SDK reader as, table will be never registered.
+ CarbonTable table;
+ if (isTransactionalTable) {
+ table = CarbonTable.buildFromTablePath(tableName, "default", tablePath);
+ } else {
+ table = CarbonTable.buildDummyTable(tablePath);
+ }
final CarbonFileInputFormat format = new CarbonFileInputFormat();
final Job job = new Job(new Configuration());
format.setTableInfo(job.getConfiguration(), table.getTableInfo());
@@ -220,10 +190,11 @@ public class CarbonReaderBuilder {
if (filterExpression != null) {
format.setFilterPredicates(job.getConfiguration(), filterExpression);
}
- if (isProjectAllColumns) {
- projectAllColumns();
+
+ if (projectionColumns != null) {
+ // set the user projection
+ format.setColumnProjection(job.getConfiguration(), projectionColumns);
}
- format.setColumnProjection(job.getConfiguration(), projectionColumns);
final List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index db118cd..a8aa795 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -385,9 +385,8 @@ public class CarbonReaderTest extends TestCase {
// Write to a Non Transactional Table
TestUtil.writeFilesAndVerify(new Schema(fields), path, true, false);
- CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
+ CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"})
- .isTransactionalTable(false)
.build();
// expected output after sorting
@@ -892,7 +891,6 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.isTransactionalTable(true)
- .projectAllColumns()
.build();
// expected output after sorting