You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/07 08:56:11 UTC
[1/2] carbondata git commit: [CARBONDATA-2310] Refactored code to
improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable
object from DataMap to Wrapper
Repository: carbondata
Updated Branches:
refs/heads/master a7926ea13 -> 531ecdf3f
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 6803fc8..c6efd77 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -70,9 +70,15 @@ public class SegmentIndexFileStore {
*/
private Map<String, byte[]> carbonIndexMapWithFullPath;
+ /**
+ * Stores the list of index files in a merge file
+ */
+ private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+
public SegmentIndexFileStore() {
carbonIndexMap = new HashMap<>();
carbonIndexMapWithFullPath = new HashMap<>();
+ carbonMergeFileToIndexFilesMap = new HashMap<>();
}
/**
@@ -201,6 +207,28 @@ public class SegmentIndexFileStore {
}
/**
+ * Read all index file names of the segment
+ *
+ * @param segmentPath
+ * @return
+ * @throws IOException
+ */
+ public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath)
+ throws IOException {
+ CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+ Map<String, String> indexFiles = new HashMap<>();
+ for (int i = 0; i < carbonIndexFiles.length; i++) {
+ if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ indexFiles
+ .put(carbonIndexFiles[i].getAbsolutePath(), carbonIndexFiles[i].getAbsolutePath());
+ } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
* List all the index files inside merge file.
* @param mergeFile
* @return
@@ -221,13 +249,14 @@ public class SegmentIndexFileStore {
* @param mergeFilePath
* @throws IOException
*/
- private void readMergeFile(String mergeFilePath) throws IOException {
+ public void readMergeFile(String mergeFilePath) throws IOException {
ThriftReader thriftReader = new ThriftReader(mergeFilePath);
try {
thriftReader.open();
MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
List<String> file_names = indexHeader.getFile_names();
+ carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
assert (file_names.size() == fileData.size());
@@ -298,8 +327,8 @@ public class SegmentIndexFileStore {
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
return carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
- return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
- .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
+ return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
}
});
}
@@ -428,4 +457,8 @@ public class SegmentIndexFileStore {
+ " is " + (System.currentTimeMillis() - startTime));
return carbondataFileFooter.getBlockletList();
}
+
+ public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
+ return carbonMergeFileToIndexFilesMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index b764bdf..496a1d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,13 +16,15 @@
*/
package org.apache.carbondata.core.indexstore.row;
+import java.io.Serializable;
+
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
/**
* It is just a normal row to store data. Implementation classes could be safe and unsafe.
* TODO move this class a global row and use across loading after DataType is changed class
*/
-public abstract class DataMapRow {
+public abstract class DataMapRow implements Serializable {
protected CarbonRowSchema[] schemas;
@@ -88,4 +90,13 @@ public abstract class DataMapRow {
public int getColumnCount() {
return schemas.length;
}
+
+ /**
+ * default implementation
+ *
+ * @return
+ */
+ public DataMapRow convertToSafeRow() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1b95984..1c1ecad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
*/
public class UnsafeDataMapRow extends DataMapRow {
- private MemoryBlock block;
+ private static final long serialVersionUID = -1156704133552046321L;
+
+ // As it is an unsafe memory block it is not recommended to serialize.
+ // If at all required to be serialized then override writeObject methods
+ // to which should take care of clearing the unsafe memory post serialization
+ private transient MemoryBlock block;
private int pointer;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 813be4a..1a77467 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -16,12 +16,16 @@
*/
package org.apache.carbondata.core.indexstore.schema;
+import java.io.Serializable;
+
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
* It just have 2 types right now, either fixed or variable.
*/
-public abstract class CarbonRowSchema {
+public abstract class CarbonRowSchema implements Serializable {
+
+ private static final long serialVersionUID = -8061282029097686495L;
protected DataType dataType;
@@ -29,6 +33,10 @@ public abstract class CarbonRowSchema {
this.dataType = dataType;
}
+ public void setDataType(DataType dataType) {
+ this.dataType = dataType;
+ }
+
/**
* Either fixed or variable length.
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index dff496b..9326b1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -492,6 +492,35 @@ public class SegmentFileStore {
}
/**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexOrMergeFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ String mergeFileName = entry.getValue().getMergeFileName();
+ if (null != mergeFileName) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName,
+ entry.getValue().mergeFileName);
+ } else {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
* Gets all carbon index files from this segment
* @return
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index c8ac15a..c7bcf2e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -97,6 +97,11 @@ public class TableInfo implements Serializable, Writable {
private List<RelationIdentifier> parentRelationIdentifiers;
+ /**
+ * flag to check whether any schema modification operation has happened after creation of table
+ */
+ private boolean isSchemaModified;
+
public TableInfo() {
dataMapSchemaList = new ArrayList<>();
isTransactionalTable = true;
@@ -115,6 +120,18 @@ public class TableInfo implements Serializable, Writable {
public void setFactTable(TableSchema factTable) {
this.factTable = factTable;
updateParentRelationIdentifier();
+ updateIsSchemaModified();
+ }
+
+ private void updateIsSchemaModified() {
+ if (null != factTable.getSchemaEvalution()) {
+ // If schema evolution entry list size is > 1 that means an alter operation is performed
+ // which has added the new schema entry in the schema evolution list.
+ // Currently apart from create table schema evolution entries
+ // are getting added only in the alter operations.
+ isSchemaModified =
+ factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() > 1 ? true : false;
+ }
}
private void updateParentRelationIdentifier() {
@@ -273,6 +290,7 @@ public class TableInfo implements Serializable, Writable {
parentRelationIdentifiers.get(i).write(out);
}
}
+ out.writeBoolean(isSchemaModified);
}
@Override public void readFields(DataInput in) throws IOException {
@@ -308,6 +326,7 @@ public class TableInfo implements Serializable, Writable {
this.parentRelationIdentifiers.add(relationIdentifier);
}
}
+ this.isSchemaModified = in.readBoolean();
}
public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
@@ -343,4 +362,9 @@ public class TableInfo implements Serializable, Writable {
public void setTransactionalTable(boolean transactionalTable) {
isTransactionalTable = transactionalTable;
}
+
+ public boolean isSchemaModified() {
+ return isSchemaModified;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index bc4a90d..41ce31c 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -68,11 +68,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
if (segment.getSegmentFileName() == null) {
String path =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
- indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+ indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
} else {
SegmentFileStore fileStore =
new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
- indexFiles = fileStore.getIndexFiles();
+ indexFiles = fileStore.getIndexOrMergeFiles();
}
return indexFiles;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
new file mode 100644
index 0000000..0d28b9f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class BlockletDataMapUtil {
+
+ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
+ TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
+ Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
+ throws IOException {
+ if (identifier.getMergeIndexFileName() != null
+ && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+ CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getMergeIndexFileName());
+ if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+ filesRead.add(indexMergeFile.getPath());
+ }
+ }
+ if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName()) });
+ }
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+ for (DataFileFooter footer : indexInfo) {
+ String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+ if (null == blockMetaInfoMap.get(blockPath)) {
+ blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
+ }
+ }
+ return blockMetaInfoMap;
+ }
+
+ /**
+ * This method will create file name to block Meta Info Mapping. This method will reduce the
+ * number of namenode calls and using this method one namenode will fetch 1000 entries
+ *
+ * @param segmentFilePath
+ * @return
+ * @throws IOException
+ */
+ public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
+ String segmentFilePath) throws IOException {
+ Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+ if (carbonFile instanceof AbstractDFSCarbonFile) {
+ PathFilter pathFilter = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return CarbonTablePath.isCarbonDataFile(path.getName());
+ }
+ };
+ CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
+ for (CarbonFile file : carbonFiles) {
+ String[] location = file.getLocations();
+ long len = file.getSize();
+ BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
+ fileNameToMetaInfoMapping.put(file.getPath().toString(), blockMetaInfo);
+ }
+ }
+ return fileNameToMetaInfoMapping;
+ }
+
+ private static BlockMetaInfo createBlockMetaInfo(
+ Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile) {
+ FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
+ switch (fileType) {
+ case LOCAL:
+ CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType);
+ return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
+ default:
+ return fileNameToMetaInfoMapping.get(carbonDataFile);
+ }
+ }
+
+ public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment)
+ throws IOException {
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>();
+ Map<String, String> indexFiles = segment.getCommittedIndexFile();
+ for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+ Path indexFile = new Path(indexFileEntry.getKey());
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(),
+ indexFileEntry.getValue(), segment.getSegmentNo()));
+ }
+ return tableBlockIndexUniqueIdentifiers;
+ }
+
+ /**
+ * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable
+ *
+ * @param tableBlockIndexUniqueIdentifiers
+ * @param distributable
+ * @return
+ */
+ public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable(
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+ BlockletDataMapDistributable distributable) {
+ TableBlockIndexUniqueIdentifier validIdentifier = null;
+ String fileName = CarbonTablePath.DataFileUtil.getFileName(distributable.getFilePath());
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ if (fileName.equals(tableBlockIndexUniqueIdentifier.getIndexFileName())) {
+ validIdentifier = tableBlockIndexUniqueIdentifier;
+ break;
+ }
+ }
+ return validIdentifier;
+ }
+
+ /**
+ * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file
+ *
+ * @param identifier
+ * @return
+ * @throws IOException
+ */
+ public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromMergeFile(
+ TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore segmentIndexFileStore)
+ throws IOException {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ String mergeFilePath =
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName();
+ segmentIndexFileStore.readMergeFile(mergeFilePath);
+ List<String> indexFiles =
+ segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
+ for (String indexFile : indexFiles) {
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), indexFile,
+ identifier.getIndexFileName(), identifier.getSegmentId()));
+ }
+ return tableBlockIndexUniqueIdentifiers;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 8544da9..3823aef 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -219,6 +219,11 @@ public class SessionParams implements Serializable, Cloneable {
throw new InvalidConfigurationException(
String.format("Invalid configuration of %s, datamap does not exist", key));
}
+ } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+ isValid = CarbonUtil.validateBoolean(value);
+ if (!isValid) {
+ throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+ }
} else {
throw new InvalidConfigurationException(
"The key " + key + " not supported for dynamic configuration.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index b9f4838..62192ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -533,7 +533,7 @@ public class CarbonTablePath {
/**
* Return the file name from file path
*/
- private static String getFileName(String dataFilePath) {
+ public static String getFileName(String dataFilePath) {
int endIndex = dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
if (endIndex > -1) {
return dataFilePath.substring(endIndex + 1, dataFilePath.length());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
new file mode 100644
index 0000000..dfbdd29
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+
+import mockit.Deencapsulation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMapFactory {
+
+ private CarbonTable carbonTable;
+
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+ private TableInfo tableInfo;
+
+ private BlockletDataMapFactory blockletDataMapFactory;
+
+ private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+ private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
+
+ @Before public void setUp()
+ throws ClassNotFoundException, IllegalAccessException, InvocationTargetException,
+ InstantiationException {
+ tableInfo = new TableInfo();
+ Constructor<?> constructor =
+ Class.forName("org.apache.carbondata.core.metadata.schema.table.CarbonTable")
+ .getDeclaredConstructors()[0];
+ constructor.setAccessible(true);
+ carbonTable = (CarbonTable) constructor.newInstance();
+ absoluteTableIdentifier =
+ AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", "default", "carbon_table");
+ Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier);
+ Deencapsulation.setField(carbonTable, "tableInfo", tableInfo);
+ blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new DataMapSchema());
+ Deencapsulation.setField(blockletDataMapFactory, "cache",
+ CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP));
+ tableBlockIndexUniqueIdentifier =
+ new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+ "0_batchno0-0-1521012756709.carbonindex", null, "0");
+ cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+ }
+
+ @Test public void addDataMapToCache()
+ throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
+ IllegalAccessException {
+ List<BlockletDataMap> dataMaps = new ArrayList<>();
+ Method method = BlockletDataMapFactory.class
+ .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+ BlockletDataMapIndexWrapper.class);
+ method.setAccessible(true);
+ method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+ new BlockletDataMapIndexWrapper(dataMaps));
+ BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+ assert null != result;
+ }
+
+ @Test public void getValidDistributables() throws IOException {
+ BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
+ Segment segment = new Segment("0", null);
+ blockletDataMapDistributable.setSegment(segment);
+ BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
+ blockletDataMapDistributable1.setSegment(segment);
+ List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
+ dataMapDistributables.add(blockletDataMapDistributable);
+ dataMapDistributables.add(blockletDataMapDistributable1);
+ new MockUp<BlockletDataMapFactory>() {
+ @Mock Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+ Segment segment) {
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
+ new TableBlockIndexUniqueIdentifier(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+ "0_batchno0-0-1521012756701.carbonindex", null, "0");
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
+ tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+ tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
+ return tableBlockIndexUniqueIdentifiers;
+ }
+ };
+ List<DataMapDistributable> validDistributables =
+ blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables);
+ assert 1 == validDistributables.size();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 8be1e2e..32af8d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,21 +16,39 @@
*/
package org.apache.carbondata.hadoop;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
/**
* CacheClient : Holds all the Cache access clients for Btree, Dictionary
*/
public class CacheClient {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CacheClient.class.getName());
+
+ private final Object lock = new Object();
+
// segment access client for driver LRU cache
private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
segmentAccessClient;
+ private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
+ segmentProperties = new ConcurrentHashMap<>();
+
public CacheClient() {
Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
@@ -45,4 +63,35 @@ public class CacheClient {
public void close() {
segmentAccessClient.close();
}
+
+ /**
+ * Method to get the segment properties and avoid construction of new segment properties until
+ * the schema is not modified
+ *
+ * @param tableIdentifier
+ * @param columnsInTable
+ * @param columnCardinality
+ */
+ public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier,
+ List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+ SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper =
+ new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable,
+ columnCardinality);
+ SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+ if (null == segmentProperties) {
+ synchronized (lock) {
+ segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+ if (null == segmentProperties) {
+ // create a metadata details
+ // this will be useful in query handling
+ // all the data file metadata will have common segment properties we
+ // can use first one to get create the segment properties
+ LOGGER.info("Constructing new SegmentProperties");
+ segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+ this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+ }
+ }
+ }
+ return segmentProperties;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
new file mode 100644
index 0000000..6835184
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+ @Override public void execute(CarbonTable carbonTable,
+ FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
+ }
+
+ @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+ FilterResolverIntf resolverIntf) {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 2af147d..0e8cf6a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -67,7 +67,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
- protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
CarbonTable carbonTableTemp;
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 25cc228..a5d4df2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -156,7 +156,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
+ public abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
throws IOException;
public static void setTablePath(Configuration configuration, String tablePath) {
@@ -172,7 +172,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
}
- public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+ public static void setDataMapJob(Configuration configuration, Object dataMapJob)
throws IOException {
if (dataMapJob != null) {
String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
@@ -466,15 +466,30 @@ m filterExpression
private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper,
DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException {
- DistributableDataMapFormat datamapDstr =
- new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
- partitionsToPrune, BlockletDataMapFactory.class.getName());
- List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+ String className = "org.apache.carbondata.hadoop.api.DistributableDataMapFormat";
+ FileInputFormat dataMapFormat =
+ createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, className);
+ List<ExtendedBlocklet> prunedBlocklets =
+ dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, resolver);
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
return prunedBlocklets;
}
+
+ public static FileInputFormat createDataMapJob(CarbonTable carbonTable,
+ DataMapExprWrapper dataMapExprWrapper, List<Segment> segments,
+ List<PartitionSpec> partitionsToPrune, String clsName) {
+ try {
+ Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0];
+ return (FileInputFormat) cons
+ .newInstance(carbonTable, dataMapExprWrapper, segments, partitionsToPrune,
+ BlockletDataMapFactory.class.getName());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Prune the segments from the already pruned blocklets.
* @param segments
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f93be63..cd34bb8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -103,7 +103,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index 64936aa..c439219 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,15 +19,21 @@ package org.apache.carbondata.hadoop.api;
import java.io.Serializable;
import java.util.List;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
/**
* Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the
* datamaps distributably and returns the final blocklet list
*/
public interface DataMapJob extends Serializable {
+ void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);
+
List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
FilterResolverIntf filter);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 36c7414..3208a28 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,8 @@ import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Locale;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -49,6 +51,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
*/
public class CarbonInputFormatUtil {
+ /**
+ * Attribute for Carbon LOGGER.
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
AbsoluteTableIdentifier identifier,
Job job) throws IOException {
@@ -58,6 +66,7 @@ public class CarbonInputFormatUtil {
CarbonTableInputFormat.setTableName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+ setDataMapJobIfConfigured(job.getConfiguration());
return carbonInputFormat;
}
@@ -71,6 +80,7 @@ public class CarbonInputFormatUtil {
CarbonTableInputFormat.setTableName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+ setDataMapJobIfConfigured(job.getConfiguration());
return carbonTableInputFormat;
}
@@ -108,11 +118,10 @@ public class CarbonInputFormatUtil {
CarbonInputFormat.setQuerySegment(conf, identifier);
CarbonInputFormat.setFilterPredicates(conf, filterExpression);
CarbonInputFormat.setColumnProjection(conf, columnProjection);
- if (dataMapJob != null &&
- Boolean.valueOf(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) {
+ if (dataMapJob != null) {
CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+ } else {
+ setDataMapJobIfConfigured(conf);
}
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
@@ -147,6 +156,32 @@ public class CarbonInputFormatUtil {
return format;
}
+ /**
+ * This method set DataMapJob if configured
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public static void setDataMapJobIfConfigured(Configuration conf) throws IOException {
+ String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+ CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className));
+ }
+
+ /**
+ * Creates instance for the DataMap Job class
+ *
+ * @param className
+ * @return
+ */
+ public static Object createDataMapJob(String className) {
+ try {
+ return Class.forName(className).getDeclaredConstructors()[0].newInstance();
+ } catch (Exception e) {
+ LOGGER.error(e);
+ return null;
+ }
+ }
+
public static String createJobTrackerID(java.util.Date date) {
return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 17c57b6..b0a59d4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -41,6 +41,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+ .addProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, "true")
sql("use lucene")
sql("DROP TABLE IF EXISTS normal_test")
sql(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0d960d0..24a6927 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -56,6 +56,7 @@ import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
@@ -554,12 +555,7 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setQuerySegment(conf, identifier)
CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
- CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
- if (CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
- CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
- }
+ CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -599,12 +595,7 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setQuerySegment(conf, identifier)
CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
- CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
- if (CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
- CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
- }
+ CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 8d2f9ee..f51c3bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -29,12 +29,12 @@ import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledExcepti
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, DistributableDataMapFormat}
/**
* Spark job to execute datamap job and prune all the datamaps distributable
*/
-class SparkDataMapJob extends DataMapJob {
+class SparkDataMapJob extends AbstractDataMapJob {
override def execute(dataMapFormat: DistributableDataMapFormat,
filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 488a53d..6622246 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
case class CarbonCountStar(
attributesRaw: Seq[Attribute],
@@ -74,11 +75,13 @@ case class CarbonCountStar(
val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
SparkHadoopUtil.get.addCredentials(jobConf)
+ CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo)
val job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
CarbonInputFormat
.setTransactionalTable(job.getConfiguration,
carbonTable.getTableInfo.isTransactionalTable)
+ CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index cce23dc..29dcec9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -115,6 +115,15 @@ object CarbonSetCommand {
"\" carbon.datamap.visible.<database_name>.<table_name>.<database_name>" +
" = <true/false> \" format")
}
+ } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+ if (key.split("\\.").length == 6) {
+ sessionParams.addProperty(key.toLowerCase(), value)
+ }
+ else {
+ throw new MalformedCarbonCommandException(
+ "property should be in \" carbon.load.datamaps.parallel.<database_name>" +
+ ".<table_name>=<true/false> \" format.")
+ }
}
}
[2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to
improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable
object from DataMap to Wrapper
Posted by ra...@apache.org.
[CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper
This PR has two JIRA fixes
[CARBONDATA-2310] Refactored code to improve Distributable interface
[CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper
This closes #2244
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/531ecdf3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/531ecdf3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/531ecdf3
Branch: refs/heads/master
Commit: 531ecdf3f40c064d4ff6ad36c43fa90a2d423588
Parents: a7926ea
Author: dhatchayani <dh...@gmail.com>
Authored: Fri Apr 27 23:03:52 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 7 13:11:29 2018 +0530
----------------------------------------------------------------------
.../org/apache/carbondata/core/cache/Cache.java | 10 +
.../dictionary/AbstractDictionaryCache.java | 6 +
.../core/constants/CarbonCommonConstants.java | 3 +
.../core/datamap/dev/CacheableDataMap.java | 51 ++++
.../core/datastore/SegmentTaskIndexStore.java | 7 +
.../filesystem/AbstractDFSCarbonFile.java | 5 +-
.../core/datastore/filesystem/CarbonFile.java | 3 +-
.../datastore/filesystem/LocalCarbonFile.java | 3 +-
.../core/indexstore/AbstractMemoryDMStore.java | 63 +++++
.../indexstore/BlockletDataMapIndexStore.java | 187 ++++++++-------
.../indexstore/BlockletDataMapIndexWrapper.java | 52 +++++
.../core/indexstore/BlockletDetailInfo.java | 66 ++++--
.../core/indexstore/SafeMemoryDMStore.java | 105 +++++++++
.../TableBlockIndexUniqueIdentifier.java | 5 +-
.../core/indexstore/UnsafeMemoryDMStore.java | 25 +-
.../blockletindex/BlockletDataMap.java | 232 ++++++++++++-------
.../BlockletDataMapDistributable.java | 12 +
.../blockletindex/BlockletDataMapFactory.java | 127 ++++++----
.../blockletindex/BlockletDataMapModel.java | 12 +
.../blockletindex/SegmentIndexFileStore.java | 39 +++-
.../core/indexstore/row/DataMapRow.java | 13 +-
.../core/indexstore/row/UnsafeDataMapRow.java | 7 +-
.../core/indexstore/schema/CarbonRowSchema.java | 10 +-
.../core/metadata/SegmentFileStore.java | 29 +++
.../core/metadata/schema/table/TableInfo.java | 24 ++
.../TableStatusReadCommittedScope.java | 4 +-
.../core/util/BlockletDataMapUtil.java | 180 ++++++++++++++
.../carbondata/core/util/SessionParams.java | 5 +
.../core/util/path/CarbonTablePath.java | 2 +-
.../TestBlockletDataMapFactory.java | 126 ++++++++++
.../apache/carbondata/hadoop/CacheClient.java | 49 ++++
.../hadoop/api/AbstractDataMapJob.java | 42 ++++
.../hadoop/api/CarbonFileInputFormat.java | 2 +-
.../hadoop/api/CarbonInputFormat.java | 27 ++-
.../hadoop/api/CarbonTableInputFormat.java | 2 +-
.../carbondata/hadoop/api/DataMapJob.java | 6 +
.../hadoop/util/CarbonInputFormatUtil.java | 43 +++-
.../lucene/LuceneFineGrainDataMapSuite.scala | 1 +
.../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +-
.../carbondata/spark/rdd/SparkDataMapJob.scala | 4 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 3 +
.../execution/command/CarbonHiveCommands.scala | 9 +
42 files changed, 1335 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index 04fa18a..6df36fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache;
import java.io.IOException;
import java.util.List;
+import org.apache.carbondata.core.memory.MemoryException;
+
/**
* A semi-persistent mapping from keys to values. Cache entries are manually added using
* #get(Key), #getAll(List<Keys>) , and are stored in the cache until
@@ -69,6 +71,14 @@ public interface Cache<K, V> {
void invalidate(K key);
/**
+ * This method will add the value to the cache for the given key
+ *
+ * @param key
+ * @param value
+ */
+ void put(K key, V value) throws IOException, MemoryException;
+
+ /**
* Access count of Cacheable entry will be decremented
*
* @param keys
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index fb67208..83c7237 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,6 +59,12 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
initThreadPoolSize();
}
+ @Override
+ public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+
/**
* This method will initialize the thread pool size to be used for creating the
* max number of threads for a job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f9bf220..56607b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1726,6 +1726,9 @@ public final class CarbonCommonConstants {
*/
public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
+ // Property to enable parallel datamap loading for a table
+ public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
new file mode 100644
index 0000000..dba0840
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Interface for data map caching
+ */
+public interface CacheableDataMap {
+
+ /**
+ * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
+ *
+ * @param tableBlockIndexUniqueIdentifier
+ * @param blockletDataMapIndexWrapper
+ */
+ void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
+
+ /**
+ * Get all the uncached distributables from the list.
+ *
+ * @param distributables
+ * @return
+ */
+ List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables)
+ throws IOException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 537c635..d325f21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datastore.block.SegmentTaskIndex;
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -142,6 +143,12 @@ public class SegmentTaskIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
+ @Override
+ public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value)
+ throws IOException, MemoryException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
/**
* returns block timestamp value from the given task
* @param taskKey
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 0419405..7255237 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -526,7 +527,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
}
@Override
- public CarbonFile[] locationAwareListFiles() throws IOException {
+ public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
if (null != fileStatus && fileStatus.isDirectory()) {
List<FileStatus> listStatus = new ArrayList<>();
Path path = fileStatus.getPath();
@@ -534,7 +535,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
- if (fileStatus.getLen() > 0) {
+ if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
listStatus.add(fileStatus);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index eb65dfd..a104137 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
public interface CarbonFile {
@@ -41,7 +42,7 @@ public interface CarbonFile {
* It returns list of files with location details.
* @return
*/
- CarbonFile[] locationAwareListFiles() throws IOException;
+ CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException;
String getName();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index d28e85e..60b7e17 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -49,6 +49,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
@@ -448,7 +449,7 @@ public class LocalCarbonFile implements CarbonFile {
return file.createNewFile();
}
- @Override public CarbonFile[] locationAwareListFiles() throws IOException {
+ @Override public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
return listFiles();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
new file mode 100644
index 0000000..e6bc691
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+
+/**
+ * Store the data map row @{@link DataMapRow}
+ */
+public abstract class AbstractMemoryDMStore implements Serializable {
+
+ protected boolean isMemoryFreed;
+
+ protected CarbonRowSchema[] schema;
+
+ protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+ public AbstractMemoryDMStore(CarbonRowSchema[] schema) {
+ this.schema = schema;
+ }
+
+ public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException;
+
+ public abstract DataMapRow getDataMapRow(int index);
+
+ public abstract void freeMemory();
+
+ public abstract int getMemoryUsed();
+
+ public CarbonRowSchema[] getSchema() {
+ return schema;
+ }
+
+ public abstract int getRowCount();
+
+ public void finishWriting() throws MemoryException {
+ // do nothing in default implementation
+ }
+
+ public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+ throw new UnsupportedOperationException("Operation not allowed");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 167a04e..ba4193e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -30,26 +29,19 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
/**
* Class to handle loading, unloading,clearing,storing of the table
* blocks
*/
public class BlockletDataMapIndexStore
- implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+ implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
/**
@@ -76,106 +68,93 @@ public class BlockletDataMapIndexStore
}
@Override
- public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
+ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
throws IOException {
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
- BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
- if (dataMap == null) {
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
+ (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
+ List<BlockletDataMap> dataMaps = new ArrayList<>();
+ if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
Set<String> filesRead = new HashSet<>();
- Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
- dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
- } catch (MemoryException e) {
+ long memorySize = 0L;
+ String segmentFilePath = identifier.getIndexFilePath();
+ Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil
+ .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+ // if the identifier is not a merge file we can directly load the datamaps
+ if (identifier.getMergeIndexFileName() == null) {
+ Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+ .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+ carbonDataFileBlockMetaInfoMapping);
+ BlockletDataMap blockletDataMap =
+ loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
+ memorySize += blockletDataMap.getMemorySize();
+ dataMaps.add(blockletDataMap);
+ blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+ } else {
+ // if the identifier is a merge file then collect the index files and load the datamaps
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
+ for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+ .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
+ carbonDataFileBlockMetaInfoMapping);
+ BlockletDataMap blockletDataMap =
+ loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
+ memorySize += blockletDataMap.getMemorySize();
+ dataMaps.add(blockletDataMap);
+ }
+ blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+ }
+ lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+ memorySize);
+ } catch (Throwable e) {
+ // clear all the memory used by datamaps loaded
+ for (DataMap dataMap : dataMaps) {
+ dataMap.clear();
+ }
LOGGER.error("memory exception when loading datamap: " + e.getMessage());
throw new RuntimeException(e.getMessage(), e);
}
}
- return dataMap;
- }
-
- private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
- SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
- if (identifier.getMergeIndexFileName() != null) {
- CarbonFile indexMergeFile = FileFactory.getCarbonFile(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getMergeIndexFileName());
- if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
- indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
- filesRead.add(indexMergeFile.getPath());
- }
- }
- if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
- indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getIndexFileName()) });
- }
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
- List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
- for (DataFileFooter footer : indexInfo) {
- String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
- if (FileFactory.isFileExist(blockPath)) {
- blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
- } else {
- LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName()
- + " The block does not exist. The block might be got deleted due to clean up post"
- + " update/delete operation over table.");
- }
- }
- return blockMetaInfoMap;
- }
-
- private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
- CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
- if (carbonFile instanceof AbstractDFSCarbonFile) {
- RemoteIterator<LocatedFileStatus> iter =
- ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
- LocatedFileStatus fileStatus = iter.next();
- String[] location = fileStatus.getBlockLocations()[0].getHosts();
- long len = fileStatus.getLen();
- return new BlockMetaInfo(location, len);
- } else {
- return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
- }
+ return blockletDataMapIndexWrapper;
}
@Override
- public List<BlockletDataMap> getAll(
+ public List<BlockletDataMapIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
- List<BlockletDataMap> blockletDataMaps =
+ List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
// Get the datamaps for each indexfile from cache.
try {
for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
- BlockletDataMap ifPresent = getIfPresent(identifier);
- if (ifPresent != null) {
- blockletDataMaps.add(ifPresent);
+ BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+ if (dataMapIndexWrapper != null) {
+ blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
} else {
missedIdentifiers.add(identifier);
}
}
if (missedIdentifiers.size() > 0) {
- SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
- Set<String> filesRead = new HashSet<>();
- for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
- Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
- blockletDataMaps.add(
- loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
+ for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
+ blockletDataMapIndexWrapper = get(identifier);
+ blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
}
}
} catch (Throwable e) {
- for (BlockletDataMap dataMap : blockletDataMaps) {
- dataMap.clear();
+ if (null != blockletDataMapIndexWrapper) {
+ List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
+ for (DataMap dataMap : dataMaps) {
+ dataMap.clear();
+ }
}
throw new IOException("Problem in loading segment blocks.", e);
}
- return blockletDataMaps;
+ return blockletDataMapIndexWrappers;
}
/**
@@ -185,9 +164,9 @@ public class BlockletDataMapIndexStore
* @return
*/
@Override
- public BlockletDataMap getIfPresent(
+ public BlockletDataMapIndexWrapper getIfPresent(
TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
- return (BlockletDataMap) lruCache.get(
+ return (BlockletDataMapIndexWrapper) lruCache.get(
tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
@@ -201,6 +180,44 @@ public class BlockletDataMapIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
+ @Override
+ public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
+ String uniqueTableSegmentIdentifier =
+ tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+ if (lock == null) {
+ lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+ }
+ long memorySize = 0L;
+ // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+ // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
+ // in the cache need to be overwritten then use the invalidate interface
+ // and then use the put interface
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ synchronized (lock) {
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+ try {
+ for (BlockletDataMap blockletDataMap: dataMaps) {
+ blockletDataMap.convertToUnsafeDMStore();
+ memorySize += blockletDataMap.getMemorySize();
+ }
+ lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
+ memorySize);
+ } catch (Throwable e) {
+ // clear all the memory acquired by data map in case of any failure
+ for (DataMap blockletDataMap : dataMaps) {
+ blockletDataMap.clear();
+ }
+ throw new IOException("Problem in adding datamap to cache.", e);
+ }
+ }
+ }
+ }
+ }
+
+
/**
* Below method will be used to load the segment of segments
* One segment may have multiple task , so table segment will be loaded
@@ -228,8 +245,6 @@ public class BlockletDataMapIndexStore
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId()));
- lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
- dataMap.getMemorySize());
}
return dataMap;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
new file mode 100644
index 0000000..d674cb4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * A cacheable wrapper of datamaps
+ */
+public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+
+ private List<BlockletDataMap> dataMaps;
+
+ public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
+ this.dataMaps = dataMaps;
+ }
+
+ @Override public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override public int getAccessCount() {
+ return 0;
+ }
+
+ @Override public long getMemorySize() {
+ return 0;
+ }
+
+ public List<BlockletDataMap> getDataMaps() {
+ return dataMaps;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 660f4c1..8bae7fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -22,20 +22,29 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.hadoop.io.Writable;
-import org.xerial.snappy.Snappy;
/**
* Blocklet detail information to be sent to each executor
*/
public class BlockletDetailInfo implements Serializable, Writable {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDetailInfo.class.getName());
+
+ private static final long serialVersionUID = 7957493757421513808L;
+
private int rowCount;
private short pagesCount;
@@ -50,6 +59,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
private BlockletInfo blockletInfo;
+ private byte[] blockletInfoBinary;
+
private long blockFooterOffset;
private List<ColumnSchema> columnSchemas;
@@ -83,6 +94,13 @@ public class BlockletDetailInfo implements Serializable, Writable {
}
public BlockletInfo getBlockletInfo() {
+ if (null == blockletInfo) {
+ try {
+ setBlockletInfoFromBinary();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
return blockletInfo;
}
@@ -90,6 +108,26 @@ public class BlockletDetailInfo implements Serializable, Writable {
this.blockletInfo = blockletInfo;
}
+ private void setBlockletInfoFromBinary() throws IOException {
+ if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) {
+ blockletInfo = new BlockletInfo();
+ ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary);
+ DataInputStream inputStream = new DataInputStream(stream);
+ try {
+ blockletInfo.readFields(inputStream);
+ } catch (IOException e) {
+ LOGGER.error("Problem in reading blocklet info");
+ throw new IOException("Problem in reading blocklet info." + e.getMessage());
+ } finally {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e, "Problem in closing input stream of reading blocklet info.");
+ }
+ }
+ }
+ }
+
public int[] getDimLens() {
return dimLens;
}
@@ -131,6 +169,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
out.writeLong(blockFooterOffset);
out.writeInt(columnSchemaBinary.length);
out.write(columnSchemaBinary);
+ out.writeInt(blockletInfoBinary.length);
+ out.write(blockletInfoBinary);
out.writeLong(blockSize);
}
@@ -153,6 +193,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
byte[] schemaArray = new byte[bytesSize];
in.readFully(schemaArray);
readColumnSchema(schemaArray);
+ int byteSize = in.readInt();
+ blockletInfoBinary = new byte[byteSize];
+ in.readFully(blockletInfoBinary);
+ setBlockletInfoFromBinary();
blockSize = in.readLong();
}
@@ -162,17 +206,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
* @throws IOException
*/
public void readColumnSchema(byte[] schemaArray) throws IOException {
- // uncompress it.
- schemaArray = Snappy.uncompress(schemaArray);
- ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
- DataInput schemaInput = new DataInputStream(schemaStream);
- columnSchemas = new ArrayList<>();
- int size = schemaInput.readShort();
- for (int i = 0; i < size; i++) {
- ColumnSchema columnSchema = new ColumnSchema();
- columnSchema.readFields(schemaInput);
- columnSchemas.add(columnSchema);
- }
+ BlockletDataMap blockletDataMap = new BlockletDataMap();
+ columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
}
/**
@@ -223,4 +258,9 @@ public class BlockletDetailInfo implements Serializable, Writable {
public byte[] getColumnSchemaBinary() {
return columnSchemaBinary;
}
+
+ public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
+ this.blockletInfoBinary = blockletInfoBinary;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
new file mode 100644
index 0000000..d7a1b8f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to memory.
+ */
+public class SafeMemoryDMStore extends AbstractMemoryDMStore {
+
+ /**
+ * holds all blocklets metadata in memory
+ */
+ private List<DataMapRow> dataMapRows =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private int runningLength;
+
+ public SafeMemoryDMStore(CarbonRowSchema[] schema) {
+ super(schema);
+ }
+
+ /**
+ * Add the index row to dataMapRows, basically to in memory.
+ *
+ * @param indexRow
+ * @return
+ */
+ @Override
+ public void addIndexRow(DataMapRow indexRow) throws MemoryException {
+ dataMapRows.add(indexRow);
+ runningLength += indexRow.getTotalSizeInBytes();
+ }
+
+ @Override
+ public DataMapRow getDataMapRow(int index) {
+ assert (index < dataMapRows.size());
+ return dataMapRows.get(index);
+ }
+
+ @Override
+ public void freeMemory() {
+ if (!isMemoryFreed) {
+ if (null != dataMapRows) {
+ dataMapRows.clear();
+ dataMapRows = null;
+ }
+ isMemoryFreed = true;
+ }
+ }
+
+ @Override
+ public int getMemoryUsed() {
+ return runningLength;
+ }
+
+ @Override
+ public int getRowCount() {
+ return dataMapRows.size();
+ }
+
+ @Override
+ public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+ setSchemaDataType();
+ UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
+ for (DataMapRow dataMapRow : dataMapRows) {
+ unsafeMemoryDMStore.addIndexRow(dataMapRow);
+ }
+ unsafeMemoryDMStore.finishWriting();
+ return unsafeMemoryDMStore;
+ }
+
+ /**
+ * Set the dataType to the schema. Needed in case of serialization / deserialization
+ */
+ private void setSchemaDataType() {
+ for (CarbonRowSchema carbonRowSchema : schema) {
+ carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index c907fa8..3226ceb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.indexstore;
+import java.io.Serializable;
import java.util.Objects;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -24,7 +25,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
/**
* Class holds the indexFile information to uniquely identitify the carbon index
*/
-public class TableBlockIndexUniqueIdentifier {
+public class TableBlockIndexUniqueIdentifier implements Serializable {
+
+ private static final long serialVersionUID = 5808112137916196344L;
private String indexFilePath;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 31ecac2..ca5e2dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
/**
* Store the data map row @{@link DataMapRow} data to unsafe.
*/
-public class UnsafeMemoryDMStore {
+public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
- private MemoryBlock memoryBlock;
+ private static final long serialVersionUID = -5344592407101055335L;
+
+ private transient MemoryBlock memoryBlock;
private static int capacity = 8 * 1024;
@@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore {
private int runningLength;
- private boolean isMemoryFreed;
-
- private CarbonRowSchema[] schema;
-
private int[] pointers;
private int rowCount;
- private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
- this.schema = schema;
+ super(schema);
this.allocatedSize = capacity;
this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
this.pointers = new int[1000];
@@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore {
* @param indexRow
* @return
*/
- public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
+ public void addIndexRow(DataMapRow indexRow) throws MemoryException {
// First calculate the required memory to keep the row in unsafe
int rowSize = indexRow.getTotalSizeInBytes();
// Check whether allocated memory is sufficient or not.
@@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore {
}
}
- public UnsafeDataMapRow getUnsafeRow(int index) {
+ public DataMapRow getDataMapRow(int index) {
assert (index < rowCount);
return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
}
@@ -205,12 +200,8 @@ public class UnsafeMemoryDMStore {
return runningLength;
}
- public CarbonRowSchema[] getSchema() {
- return schema;
- }
-
public int getRowCount() {
return rowCount;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index f72dc06..3ff9cdc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -18,10 +18,12 @@ package org.apache.carbondata.core.indexstore.blockletindex;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -32,18 +34,19 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -76,11 +79,13 @@ import org.xerial.snappy.Snappy;
/**
* Datamap implementation for blocklet.
*/
-public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
+public class BlockletDataMap extends CoarseGrainDataMap implements Serializable {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+ private static final long serialVersionUID = -2170289352240810993L;
+
private static int KEY_INDEX = 0;
private static int MIN_VALUES_INDEX = 1;
@@ -119,14 +124,17 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
private static int SEGMENTID = 5;
- private UnsafeMemoryDMStore unsafeMemoryDMStore;
+ private AbstractMemoryDMStore memoryDMStore;
- private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+ private AbstractMemoryDMStore summaryDMStore;
- private SegmentProperties segmentProperties;
+ // As it is a heavy object it is not recommended to serialize this object
+ private transient SegmentProperties segmentProperties;
private int[] columnCardinality;
+ private long blockletSchemaTime;
+
@Override
public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
@@ -150,11 +158,12 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
if (segmentProperties == null) {
List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
schemaBinary = convertSchemaToBinary(columnInTable);
+ blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
- createSchema(segmentProperties);
+ createSchema(segmentProperties, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
- segmentId);
+ segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
}
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
@@ -185,21 +194,23 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
}
}
}
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.finishWriting();
+ if (memoryDMStore != null) {
+ memoryDMStore.finishWriting();
}
- if (null != unsafeMemorySummaryDMStore) {
+ if (null != summaryDMStore) {
addTaskSummaryRowToUnsafeMemoryStore(
summaryRow,
schemaBinary,
filePath,
fileName,
segmentId);
- unsafeMemorySummaryDMStore.finishWriting();
+ summaryDMStore.finishWriting();
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+ + (System.currentTimeMillis() - startTime));
}
- LOGGER.info(
- "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
- System.currentTimeMillis() - startTime));
}
private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -207,10 +218,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
List<BlockletInfo> blockletList = fileFooter.getBlockletList();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ CarbonRowSchema[] schema = memoryDMStore.getSchema();
// Add one row to maintain task level min max for segment pruning
if (!blockletList.isEmpty() && summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
}
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
@@ -226,7 +237,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
@@ -234,7 +245,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
// compute and set task level max values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
TASK_MAX_VALUES_INDEX, false);
ordinal++;
@@ -269,7 +280,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
row.setShort((short) relativeBlockletId++, ordinal++);
// Store block size
row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ memoryDMStore.addIndexRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -295,10 +306,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
BlockMetaInfo blockMetaInfo) {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ CarbonRowSchema[] schema = memoryDMStore.getSchema();
// Add one row to maintain task level min max for segment pruning
if (summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
}
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
@@ -317,14 +328,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
// compute and set task level max values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
TASK_MAX_VALUES_INDEX, false);
ordinal++;
@@ -357,7 +368,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
// store block size
row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ memoryDMStore.addIndexRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -378,7 +389,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
summaryRow.setByteArray(segmentId, SEGMENTID);
try {
- unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+ summaryDMStore.addIndexRow(summaryRow);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -516,7 +527,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
taskMinMaxRow.setRow(row, ordinal);
}
- private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+ private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+ throws MemoryException {
List<CarbonRowSchema> indexSchemas = new ArrayList<>();
// Index key
@@ -553,8 +565,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
// for storing block length.
indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
- unsafeMemoryDMStore =
- new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+ CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+ memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
}
/**
@@ -565,7 +577,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
* @throws MemoryException
*/
private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
- byte[] filePath, byte[] fileName, byte[] segmentId)
+ byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
throws MemoryException {
List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
@@ -581,8 +593,9 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
// for storing segmentid
taskMinMaxSchemas.add(
new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
- unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
- taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+ CarbonRowSchema[] schema =
+ taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+ summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
}
private void getMinMaxSchema(SegmentProperties segmentProperties,
@@ -611,8 +624,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
public boolean isScanRequired(FilterResolverIntf filterExp) {
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+ for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -624,26 +637,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
}
private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
+ if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
List<Blocklet> blocklets = new ArrayList<>();
int numBlocklets = 0;
if (filterExp == null) {
- numBlocklets = unsafeMemoryDMStore.getRowCount();
+ numBlocklets = memoryDMStore.getRowCount();
for (int i = 0; i < numBlocklets; i++) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
}
} else {
// Remove B-tree jump logic as start and end key prepared is not
// correct for old store scenarios
int startIndex = 0;
- numBlocklets = unsafeMemoryDMStore.getRowCount();
+ numBlocklets = memoryDMStore.getRowCount();
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
while (startIndex < numBlocklets) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -663,7 +676,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
List<PartitionSpec> partitions) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
+ if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
// if it has partitioned datamap but there is no partitioned information stored, it means
@@ -740,10 +753,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
int index = Integer.parseInt(blockletId);
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
}
+ /**
+ * Get the index file name of the blocklet data map
+ *
+ * @return
+ */
+ public String getIndexFileName() {
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
+ try {
+ return new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+ CarbonCommonConstants.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ // should never happen!
+ throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+ }
+ }
+
private byte[][] getMinMaxValue(DataMapRow row, int index) {
DataMapRow minMaxRow = row.getRow(index);
byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
@@ -764,23 +793,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
detailInfo.setBlockletId((short) blockletId);
detailInfo.setDimLens(columnCardinality);
detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
- byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
- BlockletInfo blockletInfo = null;
+ detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
try {
- if (byteArray.length > 0) {
- blockletInfo = new BlockletInfo();
- ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
- DataInputStream inputStream = new DataInputStream(stream);
- blockletInfo.readFields(inputStream);
- inputStream.close();
- }
blocklet.setLocation(
new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
.split(","));
} catch (IOException e) {
throw new RuntimeException(e);
}
- detailInfo.setBlockletInfo(blockletInfo);
blocklet.setDetailInfo(detailInfo);
detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
@@ -791,7 +811,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
private String[] getFileDetails() {
try {
String[] fileDetails = new String[3];
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
fileDetails[0] =
new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
@@ -815,14 +835,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
int childNodeIndex;
int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int high = memoryDMStore.getRowCount() - 1;
int mid = 0;
int compareRes = -1;
//
while (low <= high) {
mid = (low + high) >>> 1;
// compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
if (compareRes < 0) {
high = mid - 1;
} else if (compareRes > 0) {
@@ -831,7 +851,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
// if key is matched then get the first entry
int currentPos = mid;
while (currentPos - 1 >= 0
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
currentPos--;
}
mid = currentPos;
@@ -863,14 +883,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
int childNodeIndex;
int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int high = memoryDMStore.getRowCount() - 1;
int mid = 0;
int compareRes = -1;
//
while (low <= high) {
mid = (low + high) >>> 1;
// compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
if (compareRes < 0) {
high = mid - 1;
} else if (compareRes > 0) {
@@ -878,8 +898,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
} else {
int currentPos = mid;
// if key is matched then get the first entry
- while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ while (currentPos + 1 < memoryDMStore.getRowCount()
+ && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
currentPos++;
}
mid = currentPos;
@@ -907,13 +927,13 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
buffer.putInt(key.getNoDictionaryKeys().length);
buffer.put(key.getDictionaryKeys());
buffer.put(key.getNoDictionaryKeys());
- DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
dataMapRow.setByteArray(buffer.array(), 0);
return dataMapRow;
}
private byte[] getColumnSchemaBinary() {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
return unsafeRow.getByteArray(SCHEMA);
}
@@ -937,36 +957,25 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
@Override
public void clear() {
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.freeMemory();
- unsafeMemoryDMStore = null;
+ if (memoryDMStore != null) {
+ memoryDMStore.freeMemory();
+ memoryDMStore = null;
segmentProperties = null;
}
// clear task min/max unsafe memory
- if (null != unsafeMemorySummaryDMStore) {
- unsafeMemorySummaryDMStore.freeMemory();
- unsafeMemorySummaryDMStore = null;
+ if (null != summaryDMStore) {
+ summaryDMStore.freeMemory();
+ summaryDMStore = null;
}
}
- @Override
- public long getFileTimeStamp() {
- return 0;
- }
-
- @Override
- public int getAccessCount() {
- return 0;
- }
-
- @Override
public long getMemorySize() {
long memoryUsed = 0L;
- if (unsafeMemoryDMStore != null) {
- memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+ if (memoryDMStore != null) {
+ memoryUsed += memoryDMStore.getMemoryUsed();
}
- if (null != unsafeMemorySummaryDMStore) {
- memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+ if (null != summaryDMStore) {
+ memoryUsed += summaryDMStore.getMemoryUsed();
}
return memoryUsed;
}
@@ -975,4 +984,65 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
return segmentProperties;
}
+ public void setSegmentProperties(SegmentProperties segmentProperties) {
+ this.segmentProperties = segmentProperties;
+ }
+
+ public int[] getColumnCardinality() {
+ return columnCardinality;
+ }
+
+ private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
+ throws MemoryException {
+ AbstractMemoryDMStore memoryDMStore;
+ if (addToUnsafe) {
+ memoryDMStore = new UnsafeMemoryDMStore(schema);
+ } else {
+ memoryDMStore = new SafeMemoryDMStore(schema);
+ }
+ return memoryDMStore;
+ }
+
+ /**
+ * This method will ocnvert safe to unsafe memory DM store
+ *
+ * @throws MemoryException
+ */
+ public void convertToUnsafeDMStore() throws MemoryException {
+ if (memoryDMStore instanceof SafeMemoryDMStore) {
+ UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
+ memoryDMStore.freeMemory();
+ memoryDMStore = unsafeMemoryDMStore;
+ }
+ if (summaryDMStore instanceof SafeMemoryDMStore) {
+ UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
+ summaryDMStore.freeMemory();
+ summaryDMStore = unsafeSummaryMemoryDMStore;
+ }
+ }
+
+ /**
+ * Read column schema from binary
+ * @param schemaArray
+ * @throws IOException
+ */
+ public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
+ // uncompress it.
+ schemaArray = Snappy.uncompress(schemaArray);
+ ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+ DataInput schemaInput = new DataInputStream(schemaStream);
+ List<ColumnSchema> columnSchemas = new ArrayList<>();
+ int size = schemaInput.readShort();
+ for (int i = 0; i < size; i++) {
+ ColumnSchema columnSchema = new ColumnSchema();
+ columnSchema.readFields(schemaInput);
+ columnSchemas.add(columnSchema);
+ }
+ return columnSchemas;
+ }
+
+ public long getBlockletSchemaTime() {
+ return blockletSchemaTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 99e48a5..7cdf77d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.indexstore.blockletindex;
import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
/**
* This class contains required information to make the Blocklet datamap distributable.
@@ -31,6 +32,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
*/
private String filePath;
+ private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
public BlockletDataMapDistributable(String indexFilePath) {
this.filePath = indexFilePath;
}
@@ -38,4 +41,13 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
public String getFilePath() {
return filePath;
}
+
+ public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+ return tableBlockIndexUniqueIdentifier;
+ }
+
+ public void setTableBlockIndexUniqueIdentifier(
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) {
+ this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c0bc2a6..c3df721 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -38,15 +41,17 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -59,7 +64,7 @@ import org.apache.hadoop.fs.RemoteIterator;
* Table map for blocklet
*/
public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
- implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
+ implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap {
private static final String NAME = "clustered.btree.blocklet";
@@ -69,9 +74,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
private AbsoluteTableIdentifier identifier;
// segmentId -> list of index file
- private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+ private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
- private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
+ private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
super(carbonTable, dataMapSchema);
@@ -91,24 +96,27 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
}
@Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
+ Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
- return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ new ArrayList<>(identifiers.size());
+ tableBlockIndexUniqueIdentifiers.addAll(identifiers);
+ List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+ cache.getAll(tableBlockIndexUniqueIdentifiers);
+ for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+ dataMaps.addAll(wrapper.getDataMaps());
+ }
+ return dataMaps;
}
- private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+ private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
throws IOException {
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
- tableBlockIndexUniqueIdentifiers = new ArrayList<>();
- Map<String, String> indexFiles = segment.getCommittedIndexFile();
- for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
- Path indexFile = new Path(indexFileEntry.getKey());
- tableBlockIndexUniqueIdentifiers.add(
- new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
- indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
- }
+ tableBlockIndexUniqueIdentifiers =
+ BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
}
return tableBlockIndexUniqueIdentifiers;
@@ -130,7 +138,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
}
return detailedBlocklets;
}
- List<TableBlockIndexUniqueIdentifier> identifiers =
+ Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
@@ -145,17 +153,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
- List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segment);
+ Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
return getExtendedBlocklet(identifiers, blocklet);
}
- private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+ private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
Blocklet blocklet) throws IOException {
for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) {
- DataMap dataMap = cache.get(identifier);
- return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+ BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+ List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+ for (DataMap dataMap : dataMaps) {
+ if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
+ return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+ }
}
}
throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
@@ -166,23 +176,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> distributables = new ArrayList<>();
try {
- CarbonFile[] carbonIndexFiles;
- if (segment.getSegmentFileName() == null) {
- carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
- } else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
- Map<String, String> indexFiles = fileStore.getIndexFiles();
- carbonIndexFiles = new CarbonFile[indexFiles.size()];
- int i = 0;
- for (String indexFile : indexFiles.keySet()) {
- carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
- }
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
+ int identifierCounter = 0;
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
+ String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
+ carbonIndexFiles[identifierCounter++] = FileFactory
+ .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
}
for (int i = 0; i < carbonIndexFiles.length; i++) {
Path path = new Path(carbonIndexFiles[i].getPath());
-
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
LocatedFileStatus fileStatus = iter.next();
@@ -205,13 +211,18 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
@Override
public void clear(Segment segment) {
- List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+ Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
- DataMap dataMap = cache.getIfPresent(blockIndex);
- if (dataMap != null) {
- cache.invalidate(blockIndex);
- dataMap.clear();
+ BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+ if (null != wrapper) {
+ List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+ for (DataMap dataMap : dataMaps) {
+ if (dataMap != null) {
+ cache.invalidate(blockIndex);
+ dataMap.clear();
+ }
+ }
}
}
}
@@ -246,9 +257,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
segmentNo));
}
}
- List<CoarseGrainDataMap> dataMaps;
+ List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
try {
- dataMaps = cache.getAll(identifiers);
+ List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers);
+ for (BlockletDataMapIndexWrapper wrapper : wrappers) {
+ dataMaps.addAll(wrapper.getDataMaps());
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -289,4 +303,29 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return false;
}
+ @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException {
+ cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper);
+ }
+
+ @Override
+ public List<DataMapDistributable> getAllUncachedDistributables(
+ List<DataMapDistributable> distributables) throws IOException {
+ List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
+ for (DataMapDistributable distributable : distributables) {
+ Segment segment = distributable.getSegment();
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ // filter out the tableBlockIndexUniqueIdentifiers based on distributable
+ TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
+ .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
+ (BlockletDataMapDistributable) distributable);
+ if (null == cache.getIfPresent(validIdentifier)) {
+ ((BlockletDataMapDistributable) distributable)
+ .setTableBlockIndexUniqueIdentifier(validIdentifier);
+ distributablesToBeLoaded.add(distributable);
+ }
+ }
+ return distributablesToBeLoaded;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index ebeb278..7443d15 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel {
private String segmentId;
+ private boolean addToUnsafe = true;
+
public BlockletDataMapModel(String filePath, byte[] fileData,
Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
super(filePath);
@@ -40,6 +42,12 @@ public class BlockletDataMapModel extends DataMapModel {
this.segmentId = segmentId;
}
+ public BlockletDataMapModel(String filePath, byte[] fileData,
+ Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) {
+ this(filePath, fileData, blockMetaInfoMap, segmentId);
+ this.addToUnsafe = addToUnsafe;
+ }
+
public byte[] getFileData() {
return fileData;
}
@@ -51,4 +59,8 @@ public class BlockletDataMapModel extends DataMapModel {
public String getSegmentId() {
return segmentId;
}
+
+ public boolean isAddToUnsafe() {
+ return addToUnsafe;
+ }
}