You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by qiuchenjian <gi...@git.apache.org> on 2018/11/29 11:40:29 UTC
[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2963#discussion_r237452138
--- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.minmax;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import org.apache.log4j.Logger;
+
+/**
+ * We will record the min & max value for each index column in each blocklet.
+ * Since the size of index is quite small, we will combine the index for all index columns
+ * in one file.
+ */
+public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
+ private static final Logger LOGGER = LogServiceFactory.getLogService(
+ AbstractMinMaxDataMapWriter.class.getName());
+
+ private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
+ protected int currentBlockletId;
+ private String currentIndexFile;
+ private DataOutputStream currentIndexFileOutStream;
+
+ public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
+ List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
+ super(tablePath, dataMapName, indexColumns, segment, shardName);
+ initStatsCollector();
+ initDataMapFile();
+ }
+
+ private void initStatsCollector() {
+ indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
+ CarbonColumn indexCol;
+ for (int i = 0; i < indexColumns.size(); i++) {
+ indexCol = indexColumns.get(i);
+ if (indexCol.isMeasure()
+ || (indexCol.isDimension()
+ && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
+ && !indexCol.hasEncoding(Encoding.DICTIONARY)
+ && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
+ indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
+ indexColumns.get(i).getDataType());
+ } else {
+ indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
+ }
+ }
+ }
+
+ private void initDataMapFile() throws IOException {
+ if (!FileFactory.isFileExist(dataMapPath) &&
+ !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
+ throw new IOException("Failed to create directory " + dataMapPath);
+ }
+
+ try {
+ currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
+ MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
+ FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
+ currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
+ FileFactory.getFileType(currentIndexFile));
+ } catch (IOException e) {
+ CarbonUtil.closeStreams(currentIndexFileOutStream);
+ LOGGER.error("Failed to init datamap index file", e);
+ throw e;
+ }
+ }
+
+ protected void resetBlockletLevelMinMax() {
+ for (int i = 0; i < indexColumns.size(); i++) {
+ indexColumnMinMaxCollectors[i].getPageStats().clear();
+ }
+ }
+
+ @Override
+ public void onBlockStart(String blockId) {
+ }
+
+ @Override
+ public void onBlockEnd(String blockId) {
+ }
+
+ @Override public void onBlockletStart(int blockletId) {
+ }
+
+ @Override public void onBlockletEnd(int blockletId) {
+ flushMinMaxIndexFile();
+ currentBlockletId++;
+ }
+
+ @Override
+ public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
+ // as an example, we don't use page-level min-max generated by native carbondata here, we get
+ // the min-max by comparing each row
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
+ Object originValue = pages[colIdx].getData(rowId);
+ updateBlockletMinMax(colIdx, originValue);
+ }
+ }
+ }
+
+ protected void updateBlockletMinMax(int indexColIdx, Object value) {
+ if (null == value) {
+ indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
+ return;
+ }
+
+ CarbonColumn indexCol = indexColumns.get(indexColIdx);
+ DataType dataType = indexCol.getDataType();
+ if (indexCol.isMeasure()
+ || (indexCol.isDimension()
+ && DataTypeUtil.isPrimitiveColumn(dataType)
+ && !indexCol.hasEncoding(Encoding.DICTIONARY)
+ && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
+ if (DataTypes.BOOLEAN == dataType) {
+ indexColumnMinMaxCollectors[indexColIdx].update(
+ BooleanConvert.boolean2Byte((boolean) value));
+ } else if (DataTypes.SHORT == dataType) {
--- End diff --
DataTypes.BYTE ?
---