You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:14 UTC
[27/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
deleted file mode 100644
index c7af420..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from sorted files which are generated in previous sort step.
- * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
- */
-public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
-
- private long readCounter;
-
- public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child) {
- super(configuration, child);
- }
-
- public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
- super(configuration, null);
- }
-
- @Override public DataField[] getOutput() {
- return child.getOutput();
- }
-
- @Override public void initialize() throws IOException {
- super.initialize();
- child.initialize();
- }
-
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
- configuration.getSegmentId() + "", false, false);
- CarbonDataProcessorUtil.createLocations(storeLocation);
- return storeLocation;
- }
-
- public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
- CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0);
- return model;
- }
-
- @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
- Iterator<CarbonRowBatch>[] iterators = child.execute();
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- String tableName = tableIdentifier.getTableName();
- try {
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
- System.currentTimeMillis());
- int i = 0;
- for (Iterator<CarbonRowBatch> iterator : iterators) {
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
-
- CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
- CarbonFactHandler dataHandler = null;
- boolean rowsNotExist = true;
- while (iterator.hasNext()) {
- if (rowsNotExist) {
- rowsNotExist = false;
- dataHandler = CarbonFactHandlerFactory
- .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
- dataHandler.initialise();
- }
- processBatch(iterator.next(), dataHandler);
- }
- if (!rowsNotExist) {
- finish(dataHandler);
- }
- i++;
- }
-
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
- throw new CarbonDataLoadingException(
- "Error while initializing data handler : " + e.getMessage());
- } catch (Exception e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
- }
- return null;
- }
-
- @Override protected String getStepName() {
- return "Data Writer";
- }
-
- public void finish(CarbonFactHandler dataHandler) {
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- String tableName = tableIdentifier.getTableName();
-
- try {
- dataHandler.finish();
- } catch (Exception e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
- }
- LOGGER.info("Record Processed For table: " + tableName);
- String logMessage =
- "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
- + rowCounter.get();
- LOGGER.info(logMessage);
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
- processingComplete(dataHandler);
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
- System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
- }
-
- private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
- if (null != dataHandler) {
- try {
- dataHandler.closeHandler();
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e, e.getMessage());
- throw new CarbonDataLoadingException(e.getMessage(), e);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
- }
- }
- }
-
- private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
- throws CarbonDataLoadingException {
- try {
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- dataHandler.addDataToStore(row);
- readCounter++;
- }
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- rowCounter.getAndAdd(batch.getSize());
- }
-
- public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
- try {
- readCounter++;
- dataHandler.addDataToStore(row);
- } catch (Exception e) {
- throw new CarbonDataLoadingException("unable to generate the mdkey", e);
- }
- rowCounter.getAndAdd(1);
- }
-
- @Override protected CarbonRow processRow(CarbonRow row) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
deleted file mode 100644
index cbeb20a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.parser.RowParser;
-import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
- private RowParser rowParser;
-
- private CarbonIterator<Object[]>[] inputIterators;
-
- /**
- * executor service to execute the query
- */
- public ExecutorService executorService;
-
- public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- CarbonIterator<Object[]>[] inputIterators) {
- super(configuration, null);
- this.inputIterators = inputIterators;
- }
-
- @Override public DataField[] getOutput() {
- return configuration.getDataFields();
- }
-
- @Override public void initialize() throws IOException {
- super.initialize();
- rowParser = new RowParserImpl(getOutput(), configuration);
- executorService = Executors.newCachedThreadPool();
- }
-
- @Override public Iterator<CarbonRowBatch>[] execute() {
- int batchSize = CarbonProperties.getInstance().getBatchSize();
- List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
- Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
- for (int i = 0; i < outIterators.length; i++) {
- outIterators[i] =
- new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
- configuration.isPreFetch(), executorService, rowCounter);
- }
- return outIterators;
- }
-
- /**
- * Partition input iterators equally as per the number of threads.
- * @return
- */
- private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
- // Get the number of cores configured in property.
- int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
- // Get the minimum of number of cores and iterators size to get the number of parallel threads
- // to be launched.
- int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
- List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
- for (int i = 0; i < parallelThreadNumber; i++) {
- iterators[i] = new ArrayList<>();
- }
- // Equally partition the iterators as per number of threads
- for (int i = 0; i < inputIterators.length; i++) {
- iterators[i % parallelThreadNumber].add(inputIterators[i]);
- }
- return iterators;
- }
-
- @Override protected CarbonRow processRow(CarbonRow row) {
- return null;
- }
-
- @Override public void close() {
- if (!closed) {
- super.close();
- executorService.shutdown();
- for (CarbonIterator inputIterator : inputIterators) {
- inputIterator.close();
- }
- }
- }
-
- @Override protected String getStepName() {
- return "Input Processor";
- }
-
- /**
- * This iterator wraps the list of iterators and it starts iterating the each
- * iterator of the list one by one. It also parse the data while iterating it.
- */
- private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
-
- private List<CarbonIterator<Object[]>> inputIterators;
-
- private CarbonIterator<Object[]> currentIterator;
-
- private int counter;
-
- private int batchSize;
-
- private RowParser rowParser;
-
- private Future<CarbonRowBatch> future;
-
- private ExecutorService executorService;
-
- private boolean nextBatch;
-
- private boolean firstTime;
-
- private boolean preFetch;
-
- private AtomicLong rowCounter;
-
- public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
- RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService,
- AtomicLong rowCounter) {
- this.inputIterators = inputIterators;
- this.batchSize = batchSize;
- this.rowParser = rowParser;
- this.counter = 0;
- // Get the first iterator from the list.
- currentIterator = inputIterators.get(counter++);
- this.executorService = executorService;
- this.rowCounter = rowCounter;
- this.preFetch = preFetch;
- this.nextBatch = false;
- this.firstTime = true;
- }
-
- @Override
- public boolean hasNext() {
- return nextBatch || internalHasNext();
- }
-
- private boolean internalHasNext() {
- if (firstTime) {
- firstTime = false;
- currentIterator.initialize();
- }
- boolean hasNext = currentIterator.hasNext();
- // If iterator is finished then check for next iterator.
- if (!hasNext) {
- currentIterator.close();
- // Check next iterator is available in the list.
- if (counter < inputIterators.size()) {
- // Get the next iterator from the list.
- currentIterator = inputIterators.get(counter++);
- // Initialize the new iterator
- currentIterator.initialize();
- hasNext = internalHasNext();
- }
- }
- return hasNext;
- }
-
- @Override
- public CarbonRowBatch next() {
- if (preFetch) {
- return getCarbonRowBatchWithPreFetch();
- } else {
- return getBatch();
- }
- }
-
- private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
- CarbonRowBatch result = null;
- if (future == null) {
- future = getCarbonRowBatch();
- }
- try {
- result = future.get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- nextBatch = false;
- if (hasNext()) {
- nextBatch = true;
- future = getCarbonRowBatch();
- }
-
- return result;
- }
-
- private Future<CarbonRowBatch> getCarbonRowBatch() {
- return executorService.submit(new Callable<CarbonRowBatch>() {
- @Override public CarbonRowBatch call() throws Exception {
- return getBatch();
-
- }
- });
- }
-
- private CarbonRowBatch getBatch() {
- // Create batch and fill it.
- CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
- int count = 0;
- while (internalHasNext() && count < batchSize) {
- carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
- count++;
- }
- rowCounter.getAndAdd(carbonRowBatch.getSize());
- return carbonRowBatch;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
deleted file mode 100644
index 0b93b7a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
-import org.apache.carbondata.processing.newflow.sort.SorterFactory;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It sorts the data and write them to intermediate temp files. These files will be further read
- * by next step for writing to carbondata files.
- */
-public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
- private Sorter sorter;
-
- public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child) {
- super(configuration, child);
- }
-
- @Override
- public DataField[] getOutput() {
- return child.getOutput();
- }
-
- @Override
- public void initialize() throws IOException {
- super.initialize();
- child.initialize();
- SortParameters sortParameters = SortParameters.createSortParameters(configuration);
- sorter = SorterFactory.createSorter(configuration, rowCounter);
- sorter.initialize(sortParameters);
- }
-
- @Override
- public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
- final Iterator<CarbonRowBatch>[] iterators = child.execute();
- return sorter.sort(iterators);
- }
-
- @Override
- protected CarbonRow processRow(CarbonRow row) {
- return null;
- }
-
- @Override
- public void close() {
- if (!closed) {
- super.close();
- if (sorter != null) {
- sorter.close();
- }
- }
- }
-
- @Override protected String getStepName() {
- return "Sort Processor";
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java b/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
new file mode 100644
index 0000000..649c18d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition;
+
+import java.util.List;
+
+public interface DataPartitioner {
+
+ /**
+ * All the partitions built by the Partitioner
+ */
+ List<Partition> getAllPartitions();
+
+ /**
+ * Identifies the partitions applicable for the given filter (API used for For query)
+ */
+ List<Partition> getPartitions();
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java b/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
new file mode 100644
index 0000000..95cdfb3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Partition extends Serializable {
+ /**
+ * unique identification for the partition in the cluster.
+ */
+ String getUniqueID();
+
+ /**
+ * result
+ *
+ * @return
+ */
+ List<String> getFilesPath();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
new file mode 100644
index 0000000..e533baf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.partition.Partition;
+
+/**
+ * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
+ */
+public class DefaultLoadBalancer {
+ private Map<String, List<Partition>> nodeToPartitonMap =
+ new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private Map<Partition, String> partitonToNodeMap =
+ new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
+ //Per form a round robin allocation
+ int nodeCount = nodes.size();
+
+ int partitioner = 0;
+ for (Partition partition : partitions) {
+ int nodeindex = partitioner % nodeCount;
+ String node = nodes.get(nodeindex);
+
+ List<Partition> oldList = nodeToPartitonMap.get(node);
+ if (oldList == null) {
+ oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ nodeToPartitonMap.put(node, oldList);
+ }
+ oldList.add(partition);
+
+ partitonToNodeMap.put(partition, node);
+
+ partitioner++;
+ }
+ }
+
+ public String getNodeForPartitions(Partition partition) {
+ return partitonToNodeMap.get(partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
new file mode 100644
index 0000000..c303efa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.partition.Partition;
+
+public class PartitionMultiFileImpl implements Partition {
+ private static final long serialVersionUID = -4363447826181193976L;
+ private String uniqueID;
+ private List<String> folderPath;
+
+ public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
+ this.uniqueID = uniqueID;
+ this.folderPath = folderPath;
+ }
+
+ @Override public String getUniqueID() {
+ // TODO Auto-generated method stub
+ return uniqueID;
+ }
+
+ @Override public List<String> getFilesPath() {
+ // TODO Auto-generated method stub
+ return folderPath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
new file mode 100644
index 0000000..4fb2414
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.processing.partition.DataPartitioner;
+import org.apache.carbondata.processing.partition.Partition;
+
+
+public final class QueryPartitionHelper {
+ private static QueryPartitionHelper instance = new QueryPartitionHelper();
+ private Map<String, DataPartitioner> partitionerMap =
+ new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ private Map<String, DefaultLoadBalancer> loadBalancerMap =
+ new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private QueryPartitionHelper() {
+
+ }
+
+ public static QueryPartitionHelper getInstance() {
+ return instance;
+ }
+
+ /**
+ * Get partitions applicable for query based on filters applied in query
+ */
+ public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
+ String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
+
+ DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+ return dataPartitioner.getPartitions();
+ }
+
+ public List<Partition> getAllPartitions(String databaseName, String tableName) {
+ String tableUniqueName = databaseName + '_' + tableName;
+
+ DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+ return dataPartitioner.getAllPartitions();
+ }
+
+ /**
+ * Get the node name where the partition is assigned to.
+ */
+ public String getLocation(Partition partition, String databaseName, String tableName) {
+ String tableUniqueName = databaseName + '_' + tableName;
+
+ DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
+ return loadBalancer.getNodeForPartitions(partition);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
new file mode 100644
index 0000000..92bd6ff
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.partition.DataPartitioner;
+import org.apache.carbondata.processing.partition.Partition;
+
+/**
+ * Sample partition.
+ */
+public class SampleDataPartitionerImpl implements DataPartitioner {
+
+ @Override
+ public List<Partition> getAllPartitions() {
+ return null;
+ }
+
+ @Override
+ public List<Partition> getPartitions() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
new file mode 100644
index 0000000..aeddac6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public abstract class AbstractCarbonQueryExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
+ protected CarbonTable carbonTable;
+ protected QueryModel queryModel;
+ protected QueryExecutor queryExecutor;
+ protected Map<String, TaskBlockInfo> segmentMapping;
+
+ /**
+ * get executor and execute the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+ throws QueryExecutionException, IOException {
+ queryModel.setTableBlockInfos(blockList);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ return queryExecutor.execute(queryModel);
+ }
+
+ /**
+ * Preparing of the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+ QueryModel model = new QueryModel();
+ model.setTableBlockInfos(blockList);
+ model.setForcedDetailRawQuery(true);
+ model.setFilterExpressionResolverTree(null);
+
+ List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ for (CarbonDimension dim : dimensions) {
+ // check if dimension is deleted
+ QueryDimension queryDimension = new QueryDimension(dim.getColName());
+ queryDimension.setDimension(dim);
+ dims.add(queryDimension);
+ }
+ model.setQueryDimension(dims);
+
+ List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ for (CarbonMeasure carbonMeasure : measures) {
+ // check if measure is deleted
+ QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+ queryMeasure.setMeasure(carbonMeasure);
+ msrs.add(queryMeasure);
+ }
+ model.setQueryMeasures(msrs);
+ model.setQueryId(System.nanoTime() + "");
+ model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+ model.setTable(carbonTable);
+ return model;
+ }
+
+ /**
+ * Below method will be used
+ * for cleanup
+ */
+ public void finish() {
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e, "Problem while finish: ");
+ }
+ clearDictionaryFromQueryModel();
+ }
+
+ /**
+ * This method will clear the dictionary access count after its usage is complete so
+ * that column can be deleted form LRU cache whenever memory reaches threshold
+ */
+ private void clearDictionaryFromQueryModel() {
+ if (null != queryModel) {
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
new file mode 100644
index 0000000..6afec0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+
+/**
+ * Used to read carbon blocks when add/split partition
+ */
+public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName());
+
+ public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) {
+ this.segmentMapping = segmentMapping;
+ this.carbonTable = carbonTable;
+ }
+
+ public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
+ throws QueryExecutionException, IOException {
+ List<TableBlockInfo> list = null;
+ queryModel = prepareQueryModel(list);
+ List<PartitionSpliterRawResultIterator> resultList
+ = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
+ Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+ for (String task : taskBlockListMapping) {
+ list = taskBlockInfo.getTableBlockInfoList(task);
+ LOGGER.info("for task -" + task + "-block size is -" + list.size());
+ queryModel.setTableBlockInfos(list);
+ resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
+ }
+ return resultList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
new file mode 100644
index 0000000..1db414f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.partition.spliter.exception.AlterPartitionSliceException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultProcessor {
+
+ private CarbonFactHandler dataHandler;
+ private SegmentProperties segmentProperties;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowResultProcessor.class.getName());
+
+
+ public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+ SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+ CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+ this.segmentProperties = segProp;
+ String tableName = carbonTable.getFactTableName();
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+ CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+ segProp, tableName, tempStoreLocation);
+ CarbonDataFileAttributes carbonDataFileAttributes =
+ new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+ loadModel.getFactTimeStamp());
+ carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+ carbonFactDataHandlerModel.setBucketId(bucketId);
+ //Note: set compaction flow just to convert decimal type
+ carbonFactDataHandlerModel.setCompactionFlow(true);
+ dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+ }
+
+ public boolean execute(List<Object[]> resultList) {
+ boolean processStatus;
+ boolean isDataPresent = false;
+
+ try {
+ if (!isDataPresent) {
+ dataHandler.initialise();
+ isDataPresent = true;
+ }
+ for (Object[] row: resultList) {
+ addRow(row);
+ }
+ if (isDataPresent)
+ {
+ this.dataHandler.finish();
+ }
+ processStatus = true;
+ } catch (AlterPartitionSliceException e) {
+ LOGGER.error(e, e.getMessage());
+ LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ } finally {
+ try {
+ if (isDataPresent) {
+ this.dataHandler.closeHandler();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ }
+ }
+ return processStatus;
+ }
+
+ private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
+ CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+ try {
+ this.dataHandler.addDataToStore(row);
+ } catch (CarbonDataWriterException e) {
+ throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
new file mode 100644
index 0000000..21b53cf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter.exception;
+
+import java.util.Locale;
+
+public class AlterPartitionSliceException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
deleted file mode 100644
index 31c2b4f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.io.Serializable;
-
-public class SortObserver implements Serializable {
- /**
- * is failed
- */
- private boolean isFailed;
-
- /**
- * @return the isFailed
- */
- public boolean isFailed() {
- return isFailed;
- }
-
- /**
- * @param isFailed the isFailed to set
- */
- public void setFailed(boolean isFailed) {
- this.isFailed = isFailed;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java b/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
new file mode 100644
index 0000000..292cdb3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.exception;
+
+import java.util.Locale;
+
+public class CarbonSortKeyAndGroupByException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public CarbonSortKeyAndGroupByException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public CarbonSortKeyAndGroupByException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param t
+ */
+ public CarbonSortKeyAndGroupByException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
new file mode 100644
index 0000000..1302a5b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public abstract class AbstractTempSortFileWriter implements TempSortFileWriter {
+
+ /**
+ * writeFileBufferSize
+ */
+ protected int writeBufferSize;
+
+ /**
+ * Measure count
+ */
+ protected int measureCount;
+
+ /**
+ * Measure count
+ */
+ protected int dimensionCount;
+
+ /**
+ * complexDimension count
+ */
+ protected int complexDimensionCount;
+
+ /**
+ * stream
+ */
+ protected DataOutputStream stream;
+
+ /**
+ * noDictionaryCount
+ */
+ protected int noDictionaryCount;
+
+ /**
+ * AbstractTempSortFileWriter
+ *
+ * @param writeBufferSize
+ * @param dimensionCount
+ * @param measureCount
+ */
+ public AbstractTempSortFileWriter(int dimensionCount, int complexDimensionCount, int measureCount,
+ int noDictionaryCount, int writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ this.dimensionCount = dimensionCount;
+ this.complexDimensionCount = complexDimensionCount;
+ this.measureCount = measureCount;
+ this.noDictionaryCount = noDictionaryCount;
+ }
+
+ /**
+ * Below method will be used to initialize the stream and write the entry count
+ */
+ @Override public void initiaize(File file, int entryCount)
+ throws CarbonSortKeyAndGroupByException {
+ try {
+ stream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file), writeBufferSize));
+ stream.writeInt(entryCount);
+ } catch (FileNotFoundException e1) {
+ throw new CarbonSortKeyAndGroupByException(e1);
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ }
+
+ /**
+ * Below method will be used to close the stream
+ */
+ @Override public void finish() {
+ CarbonUtil.closeStreams(stream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
new file mode 100644
index 0000000..40f650d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
+
+ /**
+ * CompressedTempSortFileWriter
+ *
+ * @param writeBufferSize
+ * @param dimensionCount
+ * @param measureCount
+ */
+ public CompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
+ int measureCount, int noDictionaryCount, int writeBufferSize) {
+ super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
+ }
+
+ /**
+ * Below method will be used to write the sort temp file
+ *
+ * @param records
+ */
+ public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+ DataOutputStream dataOutputStream = null;
+ ByteArrayOutputStream blockDataArray = null;
+ int totalSize = 0;
+ int recordSize = 0;
+ try {
+ recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
+ * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ totalSize = records.length * recordSize;
+
+ blockDataArray = new ByteArrayOutputStream(totalSize);
+ dataOutputStream = new DataOutputStream(blockDataArray);
+
+ UnCompressedTempSortFileWriter
+ .writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
+ noDictionaryCount, complexDimensionCount);
+
+ stream.writeInt(records.length);
+ byte[] byteArray = CompressorFactory.getInstance().getCompressor()
+ .compressByte(blockDataArray.toByteArray());
+ stream.writeInt(byteArray.length);
+ stream.write(byteArray);
+
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
+ } finally {
+ CarbonUtil.closeStreams(blockDataArray);
+ CarbonUtil.closeStreams(dataOutputStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
new file mode 100644
index 0000000..ffe6fb6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.AbstractQueue;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class IntermediateFileMerger implements Runnable {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(IntermediateFileMerger.class.getName());
+
+ /**
+ * recordHolderHeap
+ */
+ private AbstractQueue<SortTempFileChunkHolder> recordHolderHeap;
+
+ /**
+ * fileCounter
+ */
+ private int fileCounter;
+
+ /**
+ * stream
+ */
+ private DataOutputStream stream;
+
+ /**
+ * totalNumberOfRecords
+ */
+ private int totalNumberOfRecords;
+
+ /**
+ * records
+ */
+ private Object[][] records;
+
+ /**
+ * entryCount
+ */
+ private int entryCount;
+
+ /**
+ * writer
+ */
+ private TempSortFileWriter writer;
+
+ /**
+ * totalSize
+ */
+ private int totalSize;
+
+ private SortParameters mergerParameters;
+
+ private File[] intermediateFiles;
+
+ private File outPutFile;
+
+ private boolean[] noDictionarycolumnMapping;
+
+ /**
+ * IntermediateFileMerger Constructor
+ */
+ public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
+ File outPutFile) {
+ this.mergerParameters = mergerParameters;
+ this.fileCounter = intermediateFiles.length;
+ this.intermediateFiles = intermediateFiles;
+ this.outPutFile = outPutFile;
+ noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
+ }
+
+ @Override
+ public void run() {
+ long intermediateMergeStartTime = System.currentTimeMillis();
+ int fileConterConst = fileCounter;
+ boolean isFailed = false;
+ try {
+ startSorting();
+ initialize();
+ while (hasNext()) {
+ writeDataTofile(next());
+ }
+ if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+ if (entryCount > 0) {
+ if (entryCount < totalSize) {
+ Object[][] temp = new Object[entryCount][];
+ System.arraycopy(records, 0, temp, 0, entryCount);
+ records = temp;
+ this.writer.writeSortTempFile(temp);
+ } else {
+ this.writer.writeSortTempFile(records);
+ }
+ }
+ }
+ double intermediateMergeCostTime =
+ (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+ LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
+ " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
+ } catch (Exception e) {
+ LOGGER.error(e, "Problem while intermediate merging");
+ isFailed = true;
+ } finally {
+ records = null;
+ CarbonUtil.closeStreams(this.stream);
+ if (null != writer) {
+ writer.finish();
+ }
+ if (!isFailed) {
+ try {
+ finish();
+ } catch (CarbonSortKeyAndGroupByException e) {
+ LOGGER.error(e, "Problem while deleting the merge file");
+ }
+ } else {
+ if (outPutFile.delete()) {
+ LOGGER.error("Problem while deleting the merge file");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method is responsible for initializing the out stream
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void initialize() throws CarbonSortKeyAndGroupByException {
+ if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
+ try {
+ this.stream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(outPutFile),
+ mergerParameters.getFileWriteBufferSize()));
+ this.stream.writeInt(this.totalNumberOfRecords);
+ } catch (FileNotFoundException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
+ }
+ } else {
+ writer = TempSortFileWriterFactory.getInstance()
+ .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
+ mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
+ mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
+ mergerParameters.getFileWriteBufferSize());
+ writer.initiaize(outPutFile, totalNumberOfRecords);
+
+ if (mergerParameters.isPrefetch()) {
+ totalSize = mergerParameters.getBufferSize();
+ } else {
+ totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
+ }
+ }
+ }
+
+ /**
+ * This method will be used to get the sorted record from file
+ *
+ * @return sorted record sorted record
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+ Object[] row = null;
+
+ // poll the top object from heap
+ // heap maintains binary tree which is based on heap condition that will
+ // be based on comparator we are passing the heap
+ // when will call poll it will always delete root of the tree and then
+ // it does trickel down operation complexity is log(n)
+ SortTempFileChunkHolder poll = this.recordHolderHeap.poll();
+
+ // get the row from chunk
+ row = poll.getRow();
+
+ // check if there no entry present
+ if (!poll.hasNext()) {
+ // if chunk is empty then close the stream
+ poll.closeStream();
+
+ // change the file counter
+ --this.fileCounter;
+
+ // reaturn row
+ return row;
+ }
+
+ // read new row
+ poll.readRow();
+
+ // add to heap
+ this.recordHolderHeap.add(poll);
+
+ // return row
+ return row;
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void startSorting() throws CarbonSortKeyAndGroupByException {
+ LOGGER.info("Number of temp file: " + this.fileCounter);
+
+ // create record holder heap
+ createRecordHolderQueue(intermediateFiles);
+
+ // iterate over file list and create chunk holder and add to heap
+ LOGGER.info("Started adding first record from each file");
+
+ SortTempFileChunkHolder sortTempFileChunkHolder = null;
+
+ for (File tempFile : intermediateFiles) {
+ // create chunk holder
+ sortTempFileChunkHolder =
+ new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
+ mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
+ mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
+ mergerParameters.getMeasureDataType(),
+ mergerParameters.getNoDictionaryDimnesionColumn(),
+ mergerParameters.getNoDictionarySortColumn());
+
+ // initialize
+ sortTempFileChunkHolder.initialize();
+ sortTempFileChunkHolder.readRow();
+ this.totalNumberOfRecords += sortTempFileChunkHolder.getEntryCount();
+
+ // add to heap
+ this.recordHolderHeap.add(sortTempFileChunkHolder);
+ }
+
+ LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+ }
+
+ /**
+ * This method will be used to create the heap which will be used to hold
+ * the chunk of data
+ *
+ * @param listFiles list of temp files
+ */
+ private void createRecordHolderQueue(File[] listFiles) {
+ // creating record holder heap
+ this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
+ }
+
+ /**
+ * This method will be used to get the sorted row
+ *
+ * @return sorted row
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[] next() throws CarbonSortKeyAndGroupByException {
+ return getSortedRecordFromFile();
+ }
+
+ /**
+ * This method will be used to check whether any more element is present or
+ * not
+ *
+ * @return more element is present
+ */
+ private boolean hasNext() {
+ return this.fileCounter > 0;
+ }
+
+ /**
+ * Below method will be used to write data to file
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
+ if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+ if (entryCount == 0) {
+ records = new Object[totalSize][];
+ records[entryCount++] = row;
+ return;
+ }
+
+ records[entryCount++] = row;
+ if (entryCount == totalSize) {
+ this.writer.writeSortTempFile(records);
+ entryCount = 0;
+ records = new Object[totalSize][];
+ }
+ return;
+ }
+ try {
+ DataType[] aggType = mergerParameters.getMeasureDataType();
+ int[] mdkArray = (int[]) row[0];
+ byte[][] nonDictArray = (byte[][]) row[1];
+ int mdkIndex = 0;
+ int nonDictKeyIndex = 0;
+ // write dictionary and non dictionary dimensions here.
+ for (boolean nodictinary : noDictionarycolumnMapping) {
+ if (nodictinary) {
+ byte[] col = nonDictArray[nonDictKeyIndex++];
+ stream.writeShort(col.length);
+ stream.write(col);
+ } else {
+ stream.writeInt(mdkArray[mdkIndex++]);
+ }
+ }
+
+ int fieldIndex = 0;
+ for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
+ if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
+ stream.write((byte) 1);
+ switch (aggType[counter]) {
+ case SHORT:
+ stream.writeShort((short)NonDictionaryUtil.getMeasure(fieldIndex, row));
+ break;
+ case INT:
+ stream.writeInt((int)NonDictionaryUtil.getMeasure(fieldIndex, row));
+ break;
+ case LONG:
+ stream.writeLong((long)NonDictionaryUtil.getMeasure(fieldIndex, row));
+ break;
+ case DOUBLE:
+ stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ break;
+ case DECIMAL:
+ byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
+ }
+ } else {
+ stream.write((byte) 0);
+ }
+ fieldIndex++;
+ }
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ }
+ }
+
+ private void finish() throws CarbonSortKeyAndGroupByException {
+ if (recordHolderHeap != null) {
+ int size = recordHolderHeap.size();
+ for (int i = 0; i < size; i++) {
+ recordHolderHeap.poll().closeStream();
+ }
+ }
+ try {
+ CarbonUtil.deleteFiles(intermediateFiles);
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
new file mode 100644
index 0000000..d2579d2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class NewRowComparator implements Comparator<Object[]> {
+
+ /**
+ * mapping of dictionary dimensions and no dictionary of sort_column.
+ */
+ private boolean[] noDictionarySortColumnMaping;
+
+ /**
+ * @param noDictionarySortColumnMaping
+ */
+ public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
+ this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ int index = 0;
+
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+ if (isNoDictionary) {
+ byte[] byteArr1 = (byte[]) rowA[index];
+
+ byte[] byteArr2 = (byte[]) rowB[index];
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = (int) rowA[index];
+ int dimFieldB = (int) rowB[index];
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ index++;
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
new file mode 100644
index 0000000..e01b587
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+ /**
+ * dimension count
+ */
+ private int numberOfSortColumns;
+
+ /**
+ * RowComparatorForNormalDims Constructor
+ *
+ * @param numberOfSortColumns
+ */
+ public NewRowComparatorForNormalDims(int numberOfSortColumns) {
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ /**
+ * Below method will be used to compare two surrogate keys
+ *
+ * @see Comparator#compare(Object, Object)
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ for (int i = 0; i < numberOfSortColumns; i++) {
+
+ int dimFieldA = (int)rowA[i];
+ int dimFieldB = (int)rowB[i];
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
new file mode 100644
index 0000000..0ae0b93
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+public class RowComparator implements Comparator<Object[]> {
+ /**
+ * noDictionaryCount represent number of no dictionary cols
+ */
+ private int noDictionaryCount;
+
+ /**
+ * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+ */
+ private boolean[] noDictionarySortColumnMaping;
+
+ /**
+ * @param noDictionarySortColumnMaping
+ * @param noDictionaryCount
+ */
+ public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
+ this.noDictionaryCount = noDictionaryCount;
+ this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ int normalIndex = 0;
+ int noDictionaryindex = 0;
+
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+ if (isNoDictionary) {
+ byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+ ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
+
+ // extract a high card dims from complete byte[].
+ NonDictionaryUtil
+ .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
+
+ byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+ ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
+
+ // extract a high card dims from complete byte[].
+ NonDictionaryUtil
+ .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
+ if (difference != 0) {
+ return difference;
+ }
+ noDictionaryindex++;
+ } else {
+ int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
+ int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ normalIndex++;
+ }
+
+ }
+
+ return diff;
+ }
+}