You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:20 UTC
[3/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter
interface
[CARBONDATA-1363] Add DataMapWriter interface
This closes #1238
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f089287c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f089287c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f089287c
Branch: refs/heads/master
Commit: f089287cef1d685b81e8fa26868325503acdb635
Parents: 85cbad2
Author: Jacky Li <ja...@qq.com>
Authored: Thu Aug 10 13:36:18 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Aug 14 01:30:40 2017 +0530
----------------------------------------------------------------------
.../core/datamap/DataMapDistributable.java | 56 +++
.../carbondata/core/datamap/DataMapMeta.java | 42 ++
.../core/datamap/DataMapStoreManager.java | 144 +++++++
.../carbondata/core/datamap/TableDataMap.java | 142 +++++++
.../carbondata/core/datamap/dev/DataMap.java | 57 +++
.../core/datamap/dev/DataMapFactory.java | 73 ++++
.../core/datamap/dev/DataMapWriter.java | 58 +++
.../core/datastore/page/EncodedTablePage.java | 11 -
.../indexstore/BlockletDataMapIndexStore.java | 13 +-
.../carbondata/core/indexstore/DataMap.java | 60 ---
.../core/indexstore/DataMapDistributable.java | 56 ---
.../core/indexstore/DataMapFactory.java | 87 ----
.../core/indexstore/DataMapStoreManager.java | 139 -------
.../carbondata/core/indexstore/DataMapType.java | 36 --
.../core/indexstore/DataMapWriter.java | 50 ---
.../core/indexstore/TableDataMap.java | 133 ------
.../blockletindex/BlockletDataMap.java | 45 +-
.../blockletindex/BlockletDataMapFactory.java | 46 +-
.../core/metadata/AbsoluteTableIdentifier.java | 4 +
.../core/metadata/CarbonTableIdentifier.java | 6 +-
.../apache/carbondata/core/util/CarbonUtil.java | 6 +
.../hadoop/api/CarbonTableInputFormat.java | 16 +-
.../spark/load/CarbonLoaderUtilTest.java | 417 -------------------
.../validation/FileFooterValidator.java | 155 -------
.../testsuite/datamap/DataMapWriterSuite.scala | 180 ++++++++
.../spark/rdd/DataManagementFunc.scala | 5 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 2 +-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 2 +-
.../iud/DeleteCarbonTableSubqueryTestCase.scala | 5 +-
.../core/datastore/GenericDataType.java | 145 -------
.../datamap/DataMapWriterListener.java | 138 ++++++
.../processing/datatypes/ArrayDataType.java | 1 -
.../processing/datatypes/GenericDataType.java | 145 +++++++
.../processing/datatypes/PrimitiveDataType.java | 1 -
.../processing/datatypes/StructDataType.java | 1 -
.../impl/ComplexFieldConverterImpl.java | 2 +-
.../converter/impl/FieldEncoderFactory.java | 2 +-
.../store/CarbonFactDataHandlerColumnar.java | 131 +++---
.../store/CarbonFactDataHandlerModel.java | 14 +-
.../carbondata/processing/store/TablePage.java | 66 ++-
.../store/writer/AbstractFactDataWriter.java | 115 +++--
.../store/writer/CarbonDataWriterVo.java | 11 +
.../store/writer/CarbonFactDataWriter.java | 5 +-
.../writer/v1/CarbonFactDataWriterImplV1.java | 11 +-
.../writer/v2/CarbonFactDataWriterImplV2.java | 13 +-
.../store/writer/v3/BlockletDataHolder.java | 72 ++++
.../writer/v3/CarbonFactDataWriterImplV3.java | 131 +++---
.../store/writer/v3/DataWriterHolder.java | 62 ---
.../util/CarbonDataProcessorUtil.java | 2 +-
49 files changed, 1502 insertions(+), 1612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
new file mode 100644
index 0000000..517f629
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+/**
+ * Distributable class for datamap.
+ */
+public abstract class DataMapDistributable implements Distributable {
+
+ private String tablePath;
+
+ private String segmentId;
+
+ private String dataMapName;
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ public void setTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
+ public void setDataMapName(String dataMapName) {
+ this.dataMapName = dataMapName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
new file mode 100644
index 0000000..7746acf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+
+public class DataMapMeta {
+
+ private List<String> indexedColumns;
+
+ private FilterType optimizedOperation;
+
+ public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+ this.indexedColumns = indexedColumns;
+ this.optimizedOperation = optimizedOperation;
+ }
+
+ public List<String> getIndexedColumns() {
+ return indexedColumns;
+ }
+
+ public FilterType getOptimizedOperation() {
+ return optimizedOperation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
new file mode 100644
index 0000000..f5bc22f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * It maintains all the DataMaps in it.
+ */
+public final class DataMapStoreManager {
+
+ private static DataMapStoreManager instance = new DataMapStoreManager();
+
+ /**
+ * Contains the list of datamaps for each table.
+ */
+ private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
+
+ private DataMapStoreManager() {
+
+ }
+
+ public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
+ return allDataMaps.get(identifier.uniqueName());
+ }
+
+ /**
+ * Get the datamap for reading data.
+ *
+ * @param dataMapName
+ * @param factoryClass
+ * @return
+ */
+ public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ Class<? extends DataMapFactory> factoryClass) {
+ String table = identifier.uniqueName();
+ List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+ TableDataMap dataMap;
+ if (tableDataMaps == null) {
+ dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
+ } else {
+ dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ }
+ if (dataMap == null) {
+ throw new RuntimeException("Datamap does not exist");
+ }
+ return dataMap;
+ }
+
+ /**
+ * Return a new datamap instance and registered in the store manager.
+ * The datamap is created using datamap name, datamap factory class and table identifier.
+ */
+ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+ Class<? extends DataMapFactory> factoryClass, String dataMapName) {
+ String table = identifier.uniqueName();
+ List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+ if (tableDataMaps == null) {
+ tableDataMaps = new ArrayList<>();
+ allDataMaps.put(table, tableDataMaps);
+ }
+ TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ if (dataMap != null) {
+ throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+ }
+
+ try {
+ DataMapFactory dataMapFactory = factoryClass.newInstance();
+ dataMapFactory.init(identifier, dataMapName);
+ dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ tableDataMaps.add(dataMap);
+ return dataMap;
+ }
+
+ private TableDataMap getAbstractTableDataMap(String dataMapName,
+ List<TableDataMap> tableDataMaps) {
+ TableDataMap dataMap = null;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap.getDataMapName().equals(dataMapName)) {
+ dataMap = tableDataMap;
+ break;
+ }
+ }
+ return dataMap;
+ }
+
+ /**
+ * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+ * @param identifier
+ * @param dataMapName
+ */
+ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+ List<TableDataMap> tableDataMaps = allDataMaps.get(identifier);
+ if (tableDataMaps != null) {
+ int i = 0;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+ tableDataMap.clear();
+ tableDataMaps.remove(i);
+ break;
+ }
+ i++;
+ }
+ }
+ }
+
+ /**
+ * Returns the singleton instance
+ * @return
+ */
+ public static DataMapStoreManager getInstance() {
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
new file mode 100644
index 0000000..b55c5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.events.EventListener;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * DataMap at the table level, user can add any number of datamaps for one table. Depends
+ * on the filter condition it can prune the blocklets.
+ */
+public final class TableDataMap implements EventListener {
+
+ private AbsoluteTableIdentifier identifier;
+
+ private String dataMapName;
+
+ private DataMapFactory dataMapFactory;
+
+ /**
+ * It is called to initialize and load the required table datamap metadata.
+ */
+ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ DataMapFactory dataMapFactory) {
+ this.identifier = identifier;
+ this.dataMapName = dataMapName;
+ this.dataMapFactory = dataMapFactory;
+ }
+
+ /**
+ * Pass the valid segments and prune the datamap using filter expression
+ *
+ * @param segmentIds
+ * @param filterExp
+ * @return
+ */
+ public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
+ throws IOException {
+ List<Blocklet> blocklets = new ArrayList<>();
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+ blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+ }
+ }
+ return blocklets;
+ }
+
+ private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+ for (Blocklet blocklet : pruneBlocklets) {
+ blocklet.setSegmentId(segmentId);
+ }
+ return pruneBlocklets;
+ }
+
+ /**
+ * This is used for making the datamap distributable.
+ * It takes the valid segments and returns all the datamaps as distributable objects so that
+ * it can be distributed across machines.
+ *
+ * @return
+ */
+ public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ for (String segmentsId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+ for (DataMap dataMap : dataMaps) {
+ distributables.add(dataMap.toDistributable());
+ }
+ }
+ return distributables;
+ }
+
+ /**
+ * This method is used from any machine after it is distributed. It takes the distributable object
+ * to prune the filters.
+ *
+ * @param distributable
+ * @param filterExp
+ * @return
+ */
+ public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+ return dataMapFactory.getDataMap(distributable).prune(filterExp);
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+ dataMapFactory.fireEvent(event);
+ }
+
+ /**
+ * Clear only the datamaps of the segments
+ * @param segmentIds
+ */
+ public void clear(List<String> segmentIds) {
+ for (String segmentId: segmentIds) {
+ dataMapFactory.clear(segmentId);
+ }
+ }
+
+ /**
+ * Clears all datamap
+ */
+ public void clear() {
+ dataMapFactory.clear();
+ }
+ /**
+ * Get the unique name of datamap
+ *
+ * @return
+ */
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
+ public DataMapFactory getDataMapFactory() {
+ return dataMapFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
new file mode 100644
index 0000000..526572a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Datamap is an entity which can store and retrieve index data.
+ */
+public interface DataMap {
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ void init(String path) throws MemoryException, IOException;
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ List<Blocklet> prune(FilterResolverIntf filterExp);
+
+ /**
+ * Convert datamap to distributable object
+ * @return
+ */
+ DataMapDistributable toDistributable();
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ void clear();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
new file mode 100644
index 0000000..873457c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ void init(AbsoluteTableIdentifier identifier, String dataMapName);
+
+ /**
+ * Return a new write for this datamap
+ */
+ DataMapWriter createWriter(String segmentId);
+
+ /**
+ * Get the datamap for segmentid
+ */
+ List<DataMap> getDataMaps(String segmentId) throws IOException;
+
+ /**
+ * Get datamap for distributable object.
+ */
+ DataMap getDataMap(DataMapDistributable distributable);
+
+ /**
+ *
+ * @param event
+ */
+ void fireEvent(ChangeEvent event);
+
+ /**
+ * Clears datamap of the segment
+ */
+ void clear(String segmentId);
+
+ /**
+ * Clear all datamaps from memory
+ */
+ void clear();
+
+ /**
+ * Return metadata of this datamap
+ */
+ DataMapMeta getMeta();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
new file mode 100644
index 0000000..28163d7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Data Map writer
+ */
+public interface DataMapWriter {
+
+ /**
+ * Start of new block notification.
+ * @param blockId file name of the carbondata file
+ */
+ void onBlockStart(String blockId);
+
+ /**
+ * End of block notification
+ */
+ void onBlockEnd(String blockId);
+
+ /**
+ * Start of new blocklet notification.
+ * @param blockletId sequence number of blocklet in the block
+ */
+ void onBlockletStart(int blockletId);
+
+ /**
+ * End of blocklet notification
+ * @param blockletId sequence number of blocklet in the block
+ */
+ void onBlockletEnd(int blockletId);
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in DataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
index ea9c373..0aac1d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
@@ -42,9 +42,6 @@ public class EncodedTablePage {
// number of row in this page
private int pageSize;
- // true if it is last page of all input rows
- private boolean isLastPage;
-
// size in bytes of all encoded columns (including data and metadate)
private int encodedSize;
@@ -128,14 +125,6 @@ public class EncodedTablePage {
return pageKey;
}
- public boolean isLastPage() {
- return isLastPage;
- }
-
- public void setIsLastPage(boolean isWriteAll) {
- this.isLastPage = isWriteAll;
- }
-
public EncodedMeasurePage getMeasure(int measureIndex) {
return measures[measureIndex];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fc8c273..9d4af7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -26,8 +26,8 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
/**
* Class to handle loading, unloading,clearing,storing of the table
@@ -73,10 +73,9 @@ public class BlockletDataMapIndexStore
if (dataMap == null) {
try {
dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
- } catch (IndexBuilderException e) {
- throw new IOException(e.getMessage(), e);
- } catch (Throwable e) {
- throw new IOException("Problem in loading segment block.", e);
+ } catch (MemoryException e) {
+ LOGGER.error("memory exception when loading datamap: " + e.getMessage());
+ throw new RuntimeException(e.getMessage(), e);
}
}
return dataMap;
@@ -93,6 +92,7 @@ public class BlockletDataMapIndexStore
for (BlockletDataMap dataMap : blockletDataMaps) {
dataMap.clear();
}
+ e.printStackTrace();
throw new IOException("Problem in loading segment blocks.", e);
}
return blockletDataMaps;
@@ -130,7 +130,8 @@ public class BlockletDataMapIndexStore
* @throws IOException
*/
private BlockletDataMap loadAndGetDataMap(
- TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+ throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
deleted file mode 100644
index 1276494..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-/**
- * Datamap is an entity which can store and retrieve index data.
- */
-public interface DataMap {
-
- /**
- * Give the writer to write the data.
- *
- * @return
- */
- DataMapWriter getWriter();
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- void init(String path);
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- List<Blocklet> prune(FilterResolverIntf filterExp);
-
- /**
- * Convert datamap to distributable object
- * @return
- */
- DataMapDistributable toDistributable();
-
- /**
- * Clear complete index table and release memory.
- */
- void clear();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
deleted file mode 100644
index 4c379f3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-/**
- * Distributable class for datamap.
- */
-public abstract class DataMapDistributable implements Distributable {
-
- private String tablePath;
-
- private String segmentId;
-
- private String dataMapName;
-
- public String getTablePath() {
- return tablePath;
- }
-
- public void setTablePath(String tablePath) {
- this.tablePath = tablePath;
- }
-
- public String getSegmentId() {
- return segmentId;
- }
-
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- public String getDataMapName() {
- return dataMapName;
- }
-
- public void setDataMapName(String dataMapName) {
- this.dataMapName = dataMapName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
deleted file mode 100644
index 72f714f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * Interface for datamap factory, it is responsible for creating the datamap.
- */
-public interface DataMapFactory {
-
- /**
- * Initialization of Datamap factory
- * @param identifier
- * @param dataMapName
- */
- void init(AbsoluteTableIdentifier identifier, String dataMapName);
- /**
- * Get the datamap writer for each segmentid.
- *
- * @param identifier
- * @param segmentId
- * @return
- */
- DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
- String segmentId);
-
- /**
- * Get the datamap for segmentid
- *
- * @param segmentId
- * @return
- */
- List<DataMap> getDataMaps(String segmentId);
-
- /**
- * Get datamap for distributable object.
- *
- * @param distributable
- * @return
- */
- DataMap getDataMap(DataMapDistributable distributable);
-
- /**
- * This method checks whether the columns and the type of filters supported
- * for this datamap or not
- *
- * @param filterType
- * @return
- */
- boolean isFiltersSupported(FilterType filterType);
-
- /**
- *
- * @param event
- */
- void fireEvent(ChangeEvent event);
-
- /**
- * Clears datamap of the segment
- */
- void clear(String segmentId);
-
- /**
- * Clear all datamaps from memory
- */
- void clear();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
deleted file mode 100644
index 1664a6a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * It maintains all the DataMaps in it.
- */
-public final class DataMapStoreManager {
-
- private static DataMapStoreManager instance = new DataMapStoreManager();
-
- /**
- * Contains the list of datamaps for each table.
- */
- private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
-
- private DataMapStoreManager() {
-
- }
-
- /**
- * Get the datamap for reading data.
- *
- * @param dataMapName
- * @param mapType
- * @return
- */
- public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
- DataMapType mapType) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- TableDataMap dataMap;
- if (tableDataMaps == null) {
- createTableDataMap(identifier, mapType, dataMapName);
- tableDataMaps = dataMapMappping.get(identifier);
- }
- dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
- if (dataMap == null) {
- throw new RuntimeException("Datamap does not exist");
- }
- return dataMap;
- }
-
- /**
- * Create new datamap instance using datamap name, datamap type and table identifier
- *
- * @param mapType
- * @return
- */
- private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
- DataMapType mapType, String dataMapName) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- if (tableDataMaps == null) {
- tableDataMaps = new ArrayList<>();
- dataMapMappping.put(identifier, tableDataMaps);
- }
- TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
- if (dataMap != null) {
- throw new RuntimeException("Already datamap exists in that path with type " + mapType);
- }
-
- try {
- DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
- dataMapFactory.init(identifier, dataMapName);
- dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
- } catch (Exception e) {
- LOGGER.error(e);
- throw new RuntimeException(e);
- }
- tableDataMaps.add(dataMap);
- return dataMap;
- }
-
- private TableDataMap getAbstractTableDataMap(String dataMapName,
- List<TableDataMap> tableDataMaps) {
- TableDataMap dataMap = null;
- for (TableDataMap tableDataMap: tableDataMaps) {
- if (tableDataMap.getDataMapName().equals(dataMapName)) {
- dataMap = tableDataMap;
- break;
- }
- }
- return dataMap;
- }
-
- /**
- * Clear the datamap/datamaps of a mentioned datamap name and table from memory
- * @param identifier
- * @param dataMapName
- */
- public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- if (tableDataMaps != null) {
- int i = 0;
- for (TableDataMap tableDataMap: tableDataMaps) {
- if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
- tableDataMap.clear();
- tableDataMaps.remove(i);
- break;
- }
- i++;
- }
- }
- }
-
- /**
- * Returns the singleton instance
- * @return
- */
- public static DataMapStoreManager getInstance() {
- return instance;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
deleted file mode 100644
index 0059b29..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
-
-/**
- * Datamap type
- */
-public enum DataMapType {
- BLOCKLET(BlockletDataMapFactory.class);
-
- private Class<? extends DataMapFactory> classObject;
-
- DataMapType(Class<? extends DataMapFactory> classObject) {
- this.classObject = classObject;
- }
-
- public Class<? extends DataMapFactory> getClassObject() {
- return classObject;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
deleted file mode 100644
index bd8be09..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.io.DataOutput;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter<T> {
-
- /**
- * Initialize the data map writer with output stream
- *
- * @param outStream
- */
- void init(DataOutput outStream);
-
- /**
- * Add the index row to the in-memory store.
- */
- void writeData(T data);
-
- /**
- * Get the added row count
- *
- * @return
- */
- int getRowCount();
-
- /**
- * Finish writing of data map table, otherwise it will not be allowed to read.
- */
- void finish();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
deleted file mode 100644
index 39ca4c5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.events.EventListener;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-/**
- * DataMap at the table level, user can add any number of datamaps for one table. Depends
- * on the filter condition it can prune the blocklets.
- */
-public final class TableDataMap implements EventListener {
-
- private AbsoluteTableIdentifier identifier;
-
- private String dataMapName;
-
- private DataMapFactory dataMapFactory;
-
- /**
- * It is called to initialize and load the required table datamap metadata.
- */
- public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
- DataMapFactory dataMapFactory) {
- this.identifier = identifier;
- this.dataMapName = dataMapName;
- this.dataMapFactory = dataMapFactory;
- }
-
- /**
- * Pass the valid segments and prune the datamap using filter expression
- *
- * @param segmentIds
- * @param filterExp
- * @return
- */
- public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
- List<Blocklet> blocklets = new ArrayList<>();
- for (String segmentId : segmentIds) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
- for (DataMap dataMap : dataMaps) {
- List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
- blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
- }
- }
- return blocklets;
- }
-
- private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
- for (Blocklet blocklet : pruneBlocklets) {
- blocklet.setSegmentId(segmentId);
- }
- return pruneBlocklets;
- }
-
- /**
- * This is used for making the datamap distributable.
- * It takes the valid segments and returns all the datamaps as distributable objects so that
- * it can be distributed across machines.
- *
- * @return
- */
- public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
- List<DataMapDistributable> distributables = new ArrayList<>();
- for (String segmentsId : segmentIds) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
- for (DataMap dataMap : dataMaps) {
- distributables.add(dataMap.toDistributable());
- }
- }
- return distributables;
- }
-
- /**
- * This method is used from any machine after it is distributed. It takes the distributable object
- * to prune the filters.
- *
- * @param distributable
- * @param filterExp
- * @return
- */
- public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
- return dataMapFactory.getDataMap(distributable).prune(filterExp);
- }
-
- @Override public void fireEvent(ChangeEvent event) {
- dataMapFactory.fireEvent(event);
- }
-
- /**
- * Clear only the datamaps of the segments
- * @param segmentIds
- */
- public void clear(List<String> segmentIds) {
- for (String segmentId: segmentIds) {
- dataMapFactory.clear(segmentId);
- }
- }
-
- /**
- * Clears all datamap
- */
- public void clear() {
- dataMapFactory.clear();
- }
- /**
- * Get the unique name of datamap
- *
- * @return
- */
- public String getDataMapName() {
- return dataMapName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 4b5be11..2e82c46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -31,14 +31,13 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -64,6 +63,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+ public static final String NAME = "clustered.btree.blocklet";
+
private static int KEY_INDEX = 0;
private static int MIN_VALUES_INDEX = 1;
@@ -88,31 +89,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int[] columnCardinality;
- @Override public DataMapWriter getWriter() {
- return null;
- }
-
- @Override public void init(String path) {
+ @Override public void init(String path) throws IOException, MemoryException {
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- try {
- List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
- for (DataFileFooter fileFooter : indexInfo) {
- List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
- if (segmentProperties == null) {
- columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
- segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
- createSchema(segmentProperties);
- }
- TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
- fileFooter = CarbonUtil.readMetadatFile(blockInfo);
-
- loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
- }
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.finishWriting();
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+ for (DataFileFooter fileFooter : indexInfo) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ if (segmentProperties == null) {
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2fe6643..e189931 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -25,16 +25,16 @@ import java.util.Map;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapFactory;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
@@ -44,21 +44,25 @@ public class BlockletDataMapFactory implements DataMapFactory {
private AbsoluteTableIdentifier identifier;
+ // segmentId -> list of index file
private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+ @Override
public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
this.identifier = identifier;
cache = CacheProvider.getInstance()
.createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
}
- public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
- return null;
+ @Override
+ public DataMapWriter createWriter(String segmentId) {
+ throw new UnsupportedOperationException("not implemented");
}
- public List<DataMap> getDataMaps(String segmentId) {
+ @Override
+ public List<DataMap> getDataMaps(String segmentId) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segmentId);
if (tableBlockIndexUniqueIdentifiers == null) {
@@ -77,17 +81,10 @@ public class BlockletDataMapFactory implements DataMapFactory {
}
}
- try {
- return cache.getAll(tableBlockIndexUniqueIdentifiers);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override public boolean isFiltersSupported(FilterType filterType) {
- return true;
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
}
+ @Override
public void clear(String segmentId) {
List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
if (blockIndexes != null) {
@@ -99,17 +96,26 @@ public class BlockletDataMapFactory implements DataMapFactory {
}
}
- @Override public void clear() {
+ @Override
+ public void clear() {
for (String segmentId: segmentMap.keySet()) {
clear(segmentId);
}
}
- @Override public DataMap getDataMap(DataMapDistributable distributable) {
+ @Override
+ public DataMap getDataMap(DataMapDistributable distributable) {
return null;
}
- @Override public void fireEvent(ChangeEvent event) {
+ @Override
+ public void fireEvent(ChangeEvent event) {
}
+
+ @Override
+ public DataMapMeta getMeta() {
+ // TODO: pass SORT_COLUMNS into this class
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 22faaf2..31ad03b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -156,4 +156,8 @@ public class AbsoluteTableIdentifier implements Serializable {
}
return true;
}
+
+ public String uniqueName() {
+ return storePath + "/" + carbonTableIdentifier.toString().toLowerCase();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
index 31a0b23..cc65d9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -128,9 +128,9 @@ public class CarbonTableIdentifier implements Serializable {
return true;
}
- /*
- * @return table unidque name
- */
+ /**
+ * return unique table name
+ */
@Override public String toString() {
return databaseName + '_' + tableName;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index edc4c28..15512a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1950,5 +1950,11 @@ public final class CarbonUtil {
throw new IllegalArgumentException("Invalid data type: " + meta.getType());
}
}
+
+ public static void requireNotNull(Object obj) {
+ if (obj == null) {
+ throw new IllegalArgumentException("parameter not be null");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 54ad18b..19e264b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -30,11 +30,12 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.DataMapStoreManager;
-import org.apache.carbondata.core.indexstore.DataMapType;
-import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -246,7 +247,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
@Override public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
TableDataMap blockletMap =
- DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
+ BlockletDataMapFactory.class);
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
@@ -403,7 +405,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
TableDataMap blockletMap = DataMapStoreManager.getInstance()
- .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+ .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -549,8 +551,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
*/
public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
throws IOException, KeyGenException {
- TableDataMap blockletMap =
- DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
new SegmentStatusManager(identifier).getValidAndInvalidSegments();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
deleted file mode 100644
index 76c7e6f..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.load;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test class to test block distribution functionality
- */
-public class CarbonLoaderUtilTest {
- List<Distributable> blockInfos = null;
- int noOfNodesInput = -1;
- List<String> activeNode = null;
- Map<String, List<Distributable>> expected = null;
- Map<String, List<Distributable>> mapOfNodes = null;
-
- @Test public void nodeBlockMapping() throws Exception {
-
- // scenario when the 3 nodes and 3 executors
- initSet1();
- Map<String, List<Distributable>> mapOfNodes =
- CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- boolean isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
-
- // 2 node and 3 executors
- initSet2();
- mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
-
- // 3 data node and 2 executors
- initSet3();
- mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
- }
-
- /**
- * compares the blocks allocation
- *
- * @param expectedResult
- * @param actualResult
- * @return
- */
- private boolean compareResult(Map<String, List<Distributable>> expectedResult,
- Map<String, List<Distributable>> actualResult) {
- expectedResult = sortByListSize(expectedResult);
- actualResult = sortByListSize(actualResult);
- List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
- List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
- boolean isEqual = expectedList.size() == mapOfNodesList.size();
- if (isEqual) {
- for (int i = 0; i < expectedList.size(); i++) {
- int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
- int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
- isEqual = size1 == size2;
- if (!isEqual) {
- break;
- }
- }
- }
- return isEqual;
- }
-
- /**
- * sort by list size
- *
- * @param map
- * @return
- */
- private static Map<String, List<Distributable>> sortByListSize(
- Map<String, List<Distributable>> map) {
- List<List<Distributable>> list = new LinkedList(map.entrySet());
- Collections.sort(list, new Comparator() {
- public int compare(Object obj1, Object obj2) {
- if (obj1 == null && obj2 == null) {
- return 0;
- } else if (obj1 == null) {
- return 1;
- } else if (obj2 == null) {
- return -1;
- }
- int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
- int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
- return size2 - size1;
- }
- });
-
- Map res = new LinkedHashMap();
- for (Iterator it = list.iterator(); it.hasNext(); ) {
- Map.Entry entry = (Map.Entry) it.next();
- res.put(entry.getKey(), entry.getValue());
- }
- return res;
- }
-
- void initSet1() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-9");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-9", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 2));
- expected.put("node-9", blockInfos.subList(2, 4));
- expected.put("node-11", blockInfos.subList(4, 6));
- }
-
- void initSet2() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-9");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 2));
- expected.put("node-9", blockInfos.subList(2, 4));
- expected.put("node-11", blockInfos.subList(4, 6));
- }
-
- void initSet3() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-9", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 3));
- expected.put("node-11", blockInfos.subList(3, 6));
- }
-
-
- /**
- * Test case with 4 blocks and 4 nodes with 3 replication.
- *
- * @throws Exception
- */
- @Test public void nodeBlockMapping() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("path1", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("path2", 123, "2", new String[] { "2", "3", "4" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("path3", 123, "3", new String[] { "3", "4", "1" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("path4", 123, "4", new String[] { "1", "2", "4" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"2","3","4"}));
- inputMap.put(block3, Arrays.asList(new String[]{"3","4","1"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","4"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
-
- Map<String, List<TableBlockInfo>> outputMap
- = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 4, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 4, 4));
- }
-
- private boolean calculateBlockLocality(Map<TableBlockInfo, List<String>> inputMap,
- Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
- double notInNodeLocality = 0;
- for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
- List<TableBlockInfo> blockListOfANode = entry.getValue();
-
- for (TableBlockInfo eachBlock : blockListOfANode) {
-
- // for each block check the node locality
-
- List<String> blockLocality = inputMap.get(eachBlock);
- if (!blockLocality.contains(entry.getKey())) {
- notInNodeLocality++;
- }
- }
- }
-
- System.out.println(
- ((notInNodeLocality / numberOfBlocks) * 100) + " " + "is the node locality mismatch");
- if ((notInNodeLocality / numberOfBlocks) * 100 > 30) {
- return false;
- }
- return true;
- }
-
- private boolean calculateBlockDistribution(Map<TableBlockInfo, List<String>> inputMap,
- Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
- int nodesPerBlock = numberOfBlocks / numberOfNodes;
-
- for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
- if (entry.getValue().size() < nodesPerBlock) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Test case with 5 blocks and 3 nodes
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith5blocks3nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 3);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 5, 3));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 5, 3));
-
- }
-
- /**
- * Test case with 6 blocks and 4 nodes where 4 th node doesnt have any local data.
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith6Blocks4nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block6 =
- new TableBlockInfo("part-5-0-1462341987000", 123, "6", new String[] { "1", "2", "3" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block6, Arrays.asList(new String[]{"1","2","3"}));
-
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
- inputBlocks.add(block6);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 6, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 6, 4));
-
- }
-
- /**
- * Test case with 10 blocks and 4 nodes with 10,60,30 % distribution
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith10Blocks4nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "1", new String[] { "2", "4" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "2", new String[] { "2", "4" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "3", new String[] { "2", "4" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "4", new String[] { "2", "4" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-5-0-1462341987000", 123, "5", new String[] { "2", "4" }, 111);
- TableBlockInfo block6 =
- new TableBlockInfo("part-6-0-1462341987000", 123, "6", new String[] { "2", "4" }, 111);
- TableBlockInfo block7 =
- new TableBlockInfo("part-7-0-1462341987000", 123, "7", new String[] { "3", "4" }, 111);
- TableBlockInfo block8 =
- new TableBlockInfo("part-8-0-1462341987000", 123, "8", new String[] { "3", "4" }, 111);
- TableBlockInfo block9 =
- new TableBlockInfo("part-9-0-1462341987000", 123, "9", new String[] { "3", "4" }, 111);
- TableBlockInfo block10 =
- new TableBlockInfo("part-10-0-1462341987000", 123, "9", new String[] { "1", "4" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block2, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block3, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block4, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block5, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block6, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block7, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block8, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block9, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block10, Arrays.asList(new String[]{"1","4"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
- inputBlocks.add(block6);
- inputBlocks.add(block7);
- inputBlocks.add(block8);
- inputBlocks.add(block9);
- inputBlocks.add(block10);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 10, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 10, 4));
- }
-
-}
\ No newline at end of file