You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/16 17:06:06 UTC
carbondata git commit: [CARBONDATA-2350][DataMap] Fix bugs in minmax
datamap example
Repository: carbondata
Updated Branches:
refs/heads/master 4c9bed8bc -> ecd6c0c54
[CARBONDATA-2350][DataMap] Fix bugs in minmax datamap example
Fix bugs in minmax datamap example
This closes #2174
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ecd6c0c5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ecd6c0c5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ecd6c0c5
Branch: refs/heads/master
Commit: ecd6c0c54405c91434d1cbca2894b635318f7e4a
Parents: 4c9bed8
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Mon Apr 16 10:55:10 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Apr 17 01:05:32 2018 +0800
----------------------------------------------------------------------
.../core/datastore/page/ColumnPage.java | 57 ++++++
.../datamap/examples/MinMaxDataWriter.java | 195 ++++++++++++++-----
.../datamap/examples/MinMaxIndexDataMap.java | 77 +++++---
.../examples/MinMaxIndexDataMapFactory.java | 60 +++++-
.../MinMaxDataMapExample.scala | 77 --------
.../datamap/examples/MinMaxDataMapSuite.scala | 127 ++++++++++++
.../datamap/lucene/LuceneDataMapWriter.java | 2 +-
7 files changed, 433 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index ebca3b7..68269fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -370,6 +370,38 @@ public abstract class ColumnPage {
}
/**
+ * get value at rowId, note that the value of string&bytes is LV format
+ * @param rowId rowId
+ * @return value
+ */
+ public Object getData(int rowId) {
+ if (nullBitSet.get(rowId)) {
+ return getNull(rowId);
+ }
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+ byte value = getByte(rowId);
+ if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+ return BooleanConvert.byte2Boolean(value);
+ }
+ return value;
+ } else if (dataType == DataTypes.SHORT) {
+ return getShort(rowId);
+ } else if (dataType == DataTypes.INT) {
+ return getInt(rowId);
+ } else if (dataType == DataTypes.LONG) {
+ return getLong(rowId);
+ } else if (dataType == DataTypes.DOUBLE) {
+ return getDouble(rowId);
+ } else if (DataTypes.isDecimal(dataType)) {
+ return getDecimal(rowId);
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ return getBytes(rowId);
+ } else {
+ throw new RuntimeException("unsupported data type: " + dataType);
+ }
+ }
+
+ /**
* Set byte value at rowId
*/
public abstract void putByte(int rowId, byte value);
@@ -446,6 +478,31 @@ public abstract class ColumnPage {
}
/**
+ * Get null at rowId
+ */
+ private Object getNull(int rowId) {
+ Object result;
+ if (dataType == DataTypes.BOOLEAN) {
+ result = getBoolean(rowId);
+ } else if (dataType == DataTypes.BYTE) {
+ result = getByte(rowId);
+ } else if (dataType == DataTypes.SHORT) {
+ result = getShort(rowId);
+ } else if (dataType == DataTypes.INT) {
+ result = getInt(rowId);
+ } else if (dataType == DataTypes.LONG) {
+ result = getLong(rowId);
+ } else if (dataType == DataTypes.DOUBLE) {
+ result = getDouble(rowId);
+ } else if (DataTypes.isDecimal(dataType)) {
+ result = getDecimal(rowId);
+ } else {
+ throw new IllegalArgumentException("unsupported data type: " + dataType);
+ }
+ return result;
+ }
+
+ /**
* Get byte value at rowId
*/
public abstract byte getByte(int rowId);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index e116825..d2dbaa5 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.datamap.examples;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
@@ -33,39 +34,63 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import com.google.gson.Gson;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
public class MinMaxDataWriter extends DataMapWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(TableInfo.class.getName());
- private byte[][] pageLevelMin, pageLevelMax;
-
- private byte[][] blockletLevelMin, blockletLevelMax;
+ private Object[] pageLevelMin, pageLevelMax;
private Map<Integer, BlockletMinMax> blockMinMaxMap;
- private String dataWritePath;
+ private String dataMapName;
+ private int columnCnt;
+ private DataType[] dataTypeArray;
- public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment,
+ /**
+ * Since the sequence of indexed columns is defined the same as order in user-created, so
+ * map colIdx in user-created to colIdx in MinMaxIndex.
+ * Please note that the sequence of min-max values for each column in blocklet-min-max is not
+ * the same as indexed columns, so we need to reorder the origin while writing the min-max values
+ */
+ private Map<Integer, Integer> origin2MinMaxOrdinal = new HashMap<>();
+
+ public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String dataMapName, Segment segment,
String dataWritePath) {
super(identifier, segment, dataWritePath);
- this.identifier = identifier;
- this.segmentId = segment.getSegmentNo();
- this.dataWritePath = dataWritePath;
+ this.dataMapName = dataMapName;
+ CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+ identifier.getDatabaseName(), identifier.getTableName());
+ List<CarbonColumn> cols = carbonTable.getCreateOrderColumn(identifier.getTableName());
+ this.columnCnt = cols.size();
+ List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(identifier.getTableName());
+ for (int i = 0; i < dimensions.size(); i++) {
+ this.origin2MinMaxOrdinal.put(dimensions.get(i).getSchemaOrdinal(),
+ dimensions.get(i).getOrdinal());
+ }
+ List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(identifier.getTableName());
+ for (int i = 0; i < measures.size(); i++) {
+ this.origin2MinMaxOrdinal.put(measures.get(i).getSchemaOrdinal(),
+ dimensions.size() + measures.get(i).getOrdinal());
+ }
}
@Override public void onBlockStart(String blockId) {
- pageLevelMax = null;
- pageLevelMin = null;
- blockletLevelMax = null;
- blockletLevelMin = null;
- blockMinMaxMap = null;
blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
}
@@ -74,10 +99,12 @@ public class MinMaxDataWriter extends DataMapWriter {
}
@Override public void onBlockletStart(int blockletId) {
+ pageLevelMin = new Object[columnCnt];
+ pageLevelMax = new Object[columnCnt];
}
@Override public void onBlockletEnd(int blockletId) {
- updateBlockletMinMax(blockletId);
+ updateCurrentBlockletMinMax(blockletId);
}
@Override
@@ -106,48 +133,99 @@ public class MinMaxDataWriter extends DataMapWriter {
// pages[0].getStatistics().getMax());
// }
- byte[] value = new byte[pages[0].getBytes(0).length - 2];
- if (pageLevelMin == null && pageLevelMax == null) {
- pageLevelMin = new byte[2][];
- pageLevelMax = new byte[2][];
-
- System.arraycopy(pages[0].getBytes(0), 2, value, 0, value.length);
- pageLevelMin[1] = value;
- pageLevelMax[1] = value;
+ if (this.dataTypeArray == null) {
+ this.dataTypeArray = new DataType[this.columnCnt];
+ for (int i = 0; i < this.columnCnt; i++) {
+ this.dataTypeArray[i] = pages[i].getDataType();
+ }
+ }
- } else {
- for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) {
- System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, value.length);
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], value) > 0) {
- pageLevelMin[1] = value;
- }
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], value) < 0) {
- pageLevelMax[1] = value;
+ // as an example, we don't use page-level min-max generated by native carbondata here, we get
+ // the min-max by comparing each row
+ for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
+ for (int colIdx = 0; colIdx < columnCnt; colIdx++) {
+ Object originValue = pages[colIdx].getData(rowId);
+ // for string & bytes_array, data is prefixed with length, need to remove it
+ if (DataTypes.STRING == pages[colIdx].getDataType()
+ || DataTypes.BYTE_ARRAY == pages[colIdx].getDataType()) {
+ byte[] valueMin0 = (byte[]) pageLevelMin[colIdx];
+ byte[] valueMax0 = (byte[]) pageLevelMax[colIdx];
+ byte[] value1 = (byte[]) originValue;
+ if (pageLevelMin[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(valueMin0, 0, valueMin0.length, value1, 2, value1.length - 2) > 0) {
+ pageLevelMin[colIdx] = new byte[value1.length - 2];
+ System.arraycopy(value1, 2, (byte[]) pageLevelMin[colIdx], 0, value1.length - 2);
+ }
+ if (pageLevelMax[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(valueMax0, 0, valueMax0.length, value1, 2, value1.length - 2) < 0) {
+ pageLevelMax[colIdx] = new byte[value1.length - 2];
+ System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, value1.length - 2);
+ }
+ } else if (DataTypes.INT == pages[colIdx].getDataType()) {
+ updateMinMax(colIdx, originValue, pages[colIdx].getDataType());
+ } else {
+ throw new RuntimeException("Not implement yet");
}
}
}
}
- private void updateBlockletMinMax(int blockletId) {
- if (blockletLevelMax == null || blockletLevelMin == null) {
- blockletLevelMax = new byte[2][];
- blockletLevelMin = new byte[2][];
- if (pageLevelMax != null || pageLevelMin != null) {
- blockletLevelMin = pageLevelMin;
- blockletLevelMax = pageLevelMax;
+ private void updateMinMax(int colIdx, Object originValue, DataType dataType) {
+ if (pageLevelMin[colIdx] == null) {
+ pageLevelMin[colIdx] = originValue;
+ }
+ if (pageLevelMax[colIdx] == null) {
+ pageLevelMax[colIdx] = originValue;
+ }
+
+ if (DataTypes.SHORT == dataType) {
+ if (pageLevelMin[colIdx] == null || (short) pageLevelMin[colIdx] - (short) originValue > 0) {
+ pageLevelMin[colIdx] = originValue;
}
- } else {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMin[1], pageLevelMin[1]) > 0) {
- blockletLevelMin = pageLevelMin;
+ if (pageLevelMax[colIdx] == null || (short) pageLevelMax[colIdx] - (short) originValue < 0) {
+ pageLevelMax[colIdx] = originValue;
}
-
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMax[1], pageLevelMax[1]) > 0) {
- blockletLevelMax = pageLevelMax;
+ } else if (DataTypes.INT == dataType) {
+ if (pageLevelMin[colIdx] == null || (int) pageLevelMin[colIdx] - (int) originValue > 0) {
+ pageLevelMin[colIdx] = originValue;
+ }
+ if (pageLevelMax[colIdx] == null || (int) pageLevelMax[colIdx] - (int) originValue < 0) {
+ pageLevelMax[colIdx] = originValue;
}
+ } else if (DataTypes.LONG == dataType) {
+ if (pageLevelMin[colIdx] == null || (long) pageLevelMin[colIdx] - (long) originValue > 0) {
+ pageLevelMin[colIdx] = originValue;
+ }
+ if (pageLevelMax[colIdx] == null || (long) pageLevelMax[colIdx] - (long) originValue < 0) {
+ pageLevelMax[colIdx] = originValue;
+ }
+ } else if (DataTypes.DOUBLE == dataType) {
+ if (pageLevelMin[colIdx] == null
+ || (double) pageLevelMin[colIdx] - (double) originValue > 0) {
+ pageLevelMin[colIdx] = originValue;
+ }
+ if (pageLevelMax[colIdx] == null
+ || (double) pageLevelMax[colIdx] - (double) originValue < 0) {
+ pageLevelMax[colIdx] = originValue;
+ }
+ } else {
+ // todo:
+ throw new RuntimeException("Not implemented yet");
}
+ }
+
+ private void updateCurrentBlockletMinMax(int blockletId) {
+ byte[][] max = new byte[this.columnCnt][];
+ byte[][] min = new byte[this.columnCnt][];
+ for (int i = 0; i < this.columnCnt; i++) {
+ int targetColIdx = origin2MinMaxOrdinal.get(i);
+ max[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMax[i]);
+ min[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMin[i]);
+ }
+
BlockletMinMax blockletMinMax = new BlockletMinMax();
- blockletMinMax.setMax(blockletLevelMax);
- blockletMinMax.setMin(blockletLevelMin);
+ blockletMinMax.setMax(max);
+ blockletMinMax.setMin(min);
blockMinMaxMap.put(blockletId, blockletMinMax);
}
@@ -156,8 +234,6 @@ public class MinMaxDataWriter extends DataMapWriter {
constructMinMaxIndex(blockId);
}
-
-
/**
* Construct the Min Max Index.
* @param blockId
@@ -178,9 +254,9 @@ public class MinMaxDataWriter extends DataMapWriter {
*/
private List<MinMaxIndexBlockDetails> loadBlockDetails() {
List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails = new ArrayList<MinMaxIndexBlockDetails>();
- MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails();
for (int index = 0; index < blockMinMaxMap.size(); index++) {
+ MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails();
tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
tmpminMaxIndexBlockDetails.setBlockletId(index);
@@ -197,7 +273,8 @@ public class MinMaxDataWriter extends DataMapWriter {
*/
public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails,
String blockId) throws IOException {
- String filePath = dataWritePath +"/" + blockId + ".minmaxindex";
+ String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName);
+ String filePath = dataMapDir + File.separator + blockId + ".minmaxindex";
BufferedWriter brWriter = null;
DataOutputStream dataOutStream = null;
try {
@@ -209,6 +286,7 @@ public class MinMaxDataWriter extends DataMapWriter {
brWriter.write(minmaxIndexData);
} catch (IOException ioe) {
LOGGER.info("Error in writing minMaxindex file");
+ throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
@@ -224,4 +302,23 @@ public class MinMaxDataWriter extends DataMapWriter {
@Override public void finish() throws IOException {
}
+
+ /**
+ * create and return path that will store the datamap
+ *
+ * @param dataPath patch to store the carbondata factdata
+ * @param dataMapName datamap name
+ * @return path to store the datamap
+ * @throws IOException
+ */
+ public static String genDataMapStorePath(String dataPath, String dataMapName)
+ throws IOException {
+ String dmDir = dataPath + File.separator + dataMapName;
+ Path dmPath = FileFactory.getPath(dmDir);
+ FileSystem fs = FileFactory.getFileSystem(dmPath);
+ if (!fs.exists(dmPath)) {
+ fs.mkdirs(dmPath);
+ }
+ return dmDir;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index bad22b2..ac6358e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.datamap.examples;
import java.io.BufferedReader;
import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
@@ -30,14 +31,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
@@ -45,6 +43,10 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonUtil;
import com.google.gson.Gson;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
/**
* Datamap implementation for min max blocklet.
@@ -54,29 +56,36 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName());
- public static final String NAME = "clustered.minmax.btree.blocklet";
+ private String[] indexFilePath;
- private String filePath;
-
- private MinMaxIndexBlockDetails[] readMinMaxDataMap;
+ private MinMaxIndexBlockDetails[][] readMinMaxDataMap;
@Override
public void init(DataMapModel model) throws MemoryException, IOException {
- this.filePath = model.getFilePath();
- CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
- for (int i = 0; i < listFiles.length; i++) {
- readMinMaxDataMap = readJson(listFiles[i].getPath());
+ Path indexPath = FileFactory.getPath(model.getFilePath());
+
+ FileSystem fs = FileFactory.getFileSystem(indexPath);
+ if (!fs.exists(indexPath)) {
+ throw new IOException(
+ String.format("Path %s for MinMax index dataMap does not exist", indexPath));
+ }
+ if (!fs.isDirectory(indexPath)) {
+ throw new IOException(
+ String.format("Path %s for MinMax index dataMap must be a directory", indexPath));
}
- }
- private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) {
- String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
- CarbonFile carbonFile = FileFactory.getCarbonFile(path);
- return carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return file.getName().endsWith(".minmaxindex");
+ FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return path.getName().endsWith(".minmaxindex");
}
});
+
+ this.indexFilePath = new String[indexFileStatus.length];
+ this.readMinMaxDataMap = new MinMaxIndexBlockDetails[indexFileStatus.length][];
+ for (int i = 0; i < indexFileStatus.length; i++) {
+ this.indexFilePath[i] = indexFileStatus[i].getPath().toString();
+ this.readMinMaxDataMap[i] = readJson(this.indexFilePath[i]);
+ }
}
private MinMaxIndexBlockDetails[] readJson(String filePath) {
@@ -118,20 +127,34 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
if (filterExp == null) {
for (int i = 0; i < readMinMaxDataMap.length; i++) {
- blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+ for (int j = 0; j < readMinMaxDataMap[i].length; j++) {
+ blocklets.add(new Blocklet(indexFilePath[i],
+ String.valueOf(readMinMaxDataMap[i][j].getBlockletId())));
+ }
}
} else {
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- int startIndex = 0;
- while (startIndex < readMinMaxDataMap.length) {
- BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
- readMinMaxDataMap[startIndex].getMinValues());
- if (!bitSet.isEmpty()) {
- blocklets.add(new Blocklet(filePath,
- String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
+ for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) {
+ for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; blkltIdx++) {
+
+ BitSet bitSet = filterExecuter.isScanRequired(
+ readMinMaxDataMap[blkIdx][blkltIdx].getMaxValues(),
+ readMinMaxDataMap[blkIdx][blkltIdx].getMinValues());
+ if (!bitSet.isEmpty()) {
+ String blockFileName = indexFilePath[blkIdx].substring(
+ indexFilePath[blkIdx].lastIndexOf(File.separatorChar) + 1,
+ indexFilePath[blkIdx].indexOf(".minmaxindex"));
+ Blocklet blocklet = new Blocklet(blockFileName,
+ String.valueOf(readMinMaxDataMap[blkIdx][blkltIdx].getBlockletId()));
+ LOGGER.info(String.format("MinMaxDataMap: Need to scan block#%s -> blocklet#%s, %s",
+ blkIdx, blkltIdx, blocklet));
+ blocklets.add(blocklet);
+ } else {
+ LOGGER.info(String.format("MinMaxDataMap: Skip scan block#%s -> blocklet#%s",
+ blkIdx, blkltIdx));
+ }
}
- startIndex++;
}
}
return blocklets;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index a2f92c9..758a67c 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -19,9 +19,11 @@ package org.apache.carbondata.datamap.examples;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
@@ -31,21 +33,61 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+import org.apache.commons.lang3.StringUtils;
+
/**
* Min Max DataMap Factory
*/
public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
-
+ private static final LogService LOGGER = LogServiceFactory.getLogService(
+ MinMaxIndexDataMapFactory.class.getName());
+ private DataMapMeta dataMapMeta;
+ private String dataMapName;
private AbsoluteTableIdentifier identifier;
- @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
+ // this is an example for datamap, we can choose the columns and operations that
+ // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
+ @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+ throws IOException, MalformedDataMapCommandException {
this.identifier = identifier;
+ this.dataMapName = dataMapSchema.getDataMapName();
+
+ String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+ CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName);
+ if (null == carbonTable) {
+ throw new IOException("Failed to get carbon table with name " + tableUniqueName);
+ }
+
+ // columns that will be indexed
+ List<CarbonColumn> allColumns = carbonTable.getCreateOrderColumn(identifier.getTableName());
+ List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new Transformer() {
+ @Override public Object transform(Object o) {
+ return ((CarbonColumn) o).getColName();
+ }
+ });
+ LOGGER.info("MinMaxDataMap support index columns: " + StringUtils.join(minMaxCols, ", "));
+
+ // operations that will be supported on the indexed columns
+ List<ExpressionType> optOperations = new ArrayList<>();
+ optOperations.add(ExpressionType.EQUALS);
+ optOperations.add(ExpressionType.GREATERTHAN);
+ optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+ optOperations.add(ExpressionType.LESSTHAN);
+ optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+ optOperations.add(ExpressionType.NOT_EQUALS);
+ LOGGER.error("MinMaxDataMap support operations: " + StringUtils.join(optOperations, ", "));
+ this.dataMapMeta = new DataMapMeta(minMaxCols, optOperations);
}
/**
@@ -55,7 +97,7 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
* @return
*/
@Override public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
- return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
+ return new MinMaxDataWriter(identifier, dataMapName, segment, writeDirectoryPath);
}
/**
@@ -75,7 +117,10 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
try {
dataMap.init(new DataMapModel(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())));
+ MinMaxDataWriter.genDataMapStorePath(
+ CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), segment.getSegmentNo()),
+ dataMapName)));
} catch (MemoryException ex) {
throw new IOException(ex);
}
@@ -108,7 +153,7 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
@Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
ReadCommittedScope readCommittedScope)
throws IOException {
- return null;
+ return getDataMaps(distributable.getSegment(), readCommittedScope);
}
@Override public void fireEvent(Event event) {
@@ -116,7 +161,6 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
}
@Override public DataMapMeta getMeta() {
- return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
- new ArrayList<ExpressionType>());
+ return this.dataMapMeta;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
deleted file mode 100644
index 59872aa..0000000
--- a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.datamap.examples
-
-import java.io.File
-
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-
-object MinMaxDataMapExample {
- def main(args: Array[String]): Unit = {
-
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "").getCanonicalPath
- val storeLocation = s"$rootPath/dataMap/examples/target/store"
- val warehouse = s"$rootPath/datamap/examples/target/warehouse"
- val metastoredb = s"$rootPath/datamap/examples/target"
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
- import org.apache.spark.sql.CarbonSession._
-
- val spark = SparkSession
- .builder()
- .master("local")
- .appName("CarbonDataMapExample")
- .config("spark.sql.warehouse.dir", warehouse)
- .getOrCreateCarbonSession(storeLocation)
-
- spark.sparkContext.setLogLevel("ERROR")
- import spark.implicits._
-
- // register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"),
- classOf[MinMaxIndexDataMapFactory].getName,
- MinMaxIndexDataMap.NAME)
-
- spark.sql("DROP TABLE IF EXISTS carbonminmax")
-
- val df = spark.sparkContext.parallelize(1 to 33000)
- .map(x => ("a", "b", x))
- .toDF("c1", "c2", "c3")
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbonminmax")
- .mode(SaveMode.Overwrite)
- .save()
-
- // Query the table.
- spark.sql("select c2 from carbonminmax").show(20, false)
- spark.sql("select c2 from carbonminmax where c2 = 'b'").show(20, false)
- spark.sql("DROP TABLE IF EXISTS carbonminmax")
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
new file mode 100644
index 0000000..429dac2
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.examples
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll {
+ val inputFile = s"$resourcesPath/minmax_datamap_input.csv"
+ val normalTable = "carbonNormal"
+ val minMaxDMSampleTable = "carbonMinMax"
+ val dataMapName = "minmax_dm"
+ val lineNum = 500000
+
+ override protected def beforeAll(): Unit = {
+ createFile(inputFile, line = lineNum, start = 0)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $minMaxDMSampleTable")
+ }
+
+ test("test minmax datamap") {
+ sql(
+ s"""
+ | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE TABLE $minMaxDMSampleTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP $dataMapName ON TABLE $minMaxDMSampleTable
+ | USING '${classOf[MinMaxIndexDataMapFactory].getName}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $minMaxDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$normalTable' INTO TABLE $minMaxDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ sql(s"show datamap on table $minMaxDMSampleTable").show(false)
+ checkAnswer(sql(s"show datamap on table $minMaxDMSampleTable"),
+ Row(dataMapName, classOf[MinMaxIndexDataMapFactory].getName, "(NA)"))
+ // not that the table will use default dimension as sort_columns, so for the following cases,
+ // the pruning result will differ.
+ // 1 blocklet
+ checkAnswer(sql(s"select * from $minMaxDMSampleTable where id = 1"),
+ sql(s"select * from $normalTable where id = 1"))
+ // 6 blocklet
+ checkAnswer(sql(s"select * from $minMaxDMSampleTable where id = 999"),
+ sql(s"select * from $normalTable where id = 999"))
+ // 1 blocklet
+ checkAnswer(sql(s"select * from $minMaxDMSampleTable where city = 'city_1'"),
+ sql(s"select * from $normalTable where city = 'city_1'"))
+ // 1 blocklet
+ checkAnswer(sql(s"select * from $minMaxDMSampleTable where city = 'city_999'"),
+ sql(s"select * from $normalTable where city = 'city_999'"))
+ // 6 blocklet
+ checkAnswer(sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+ s" count(distinct s1), count(distinct s2) from $minMaxDMSampleTable"),
+ sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+ s" count(distinct s1), count(distinct s2) from $normalTable"))
+ // 6 blocklet
+ checkAnswer(sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+ s" from $minMaxDMSampleTable"),
+ sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+ s" from $normalTable"))
+ }
+
+ override protected def afterAll(): Unit = {
+ deleteFile(inputFile)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $minMaxDMSampleTable")
+ }
+
+ private def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+ if (!new File(fileName).exists()) {
+ val write = new PrintWriter(new File(fileName))
+ for (i <- start until (start + line)) {
+ write.println(
+ s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+ }
+ write.close()
+ }
+ }
+
+ private def deleteFile(fileName: String): Unit = {
+ val file = new File(fileName)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 86b2382..4286e5a 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -114,7 +114,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
FileSystem fs = FileFactory.getFileSystem(indexPath);
// if index path not exists, create it
- if (fs.exists(indexPath)) {
+ if (!fs.exists(indexPath)) {
fs.mkdirs(indexPath);
}