You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:23 UTC
[36/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
new file mode 100644
index 0000000..f5f112c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.row;
+
+import java.util.NoSuchElementException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+
+
+/**
+ * Batch of rows.
+ */
+public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
+
+ private CarbonRow[] rowBatch;
+
+ private int size = 0;
+
+ private int index = 0;
+
+ public CarbonRowBatch(int batchSize) {
+ this.rowBatch = new CarbonRow[batchSize];
+ }
+
+ public void addRow(CarbonRow carbonRow) {
+ rowBatch[size++] = carbonRow;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ @Override public boolean hasNext() {
+ return index < size;
+ }
+
+ @Override
+ public CarbonRow next() throws NoSuchElementException {
+ if (hasNext()) {
+ return rowBatch[index++];
+ }
+ throw new NoSuchElementException("no more elements to iterate");
+ }
+
+ @Override public void remove() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
new file mode 100644
index 0000000..5d39145
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.row;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+
+/**
+ * Batch of sorted rows which are ready to be processed by
+ */
+public class CarbonSortBatch extends CarbonRowBatch {
+
+ private UnsafeSingleThreadFinalSortFilesMerger iterator;
+
+ public CarbonSortBatch(UnsafeSingleThreadFinalSortFilesMerger iterator) {
+ super(0);
+ this.iterator = iterator;
+ }
+
+ @Override public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override public CarbonRow next() {
+ return new CarbonRow(iterator.next());
+ }
+
+ @Override public void close() {
+ iterator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
new file mode 100644
index 0000000..550fe70
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.sort.impl.ThreadStatusObserver;
+
+/**
+ * The class defines the common methods used in across various type of sort
+ */
+public abstract class AbstractMergeSorter implements Sorter {
+ /**
+ * instance of thread status observer
+ */
+ protected ThreadStatusObserver threadStatusObserver;
+
+ /**
+ * Below method will be used to check error in exception
+ */
+ public void checkError() {
+ if (threadStatusObserver.getThrowable() != null) {
+ if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
+ throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
+ } else {
+ throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
new file mode 100644
index 0000000..23179fa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Sort scope options
+ */
+public class SortScopeOptions {
+
+ public static SortScope getSortScope(String sortScope) {
+ if (sortScope == null) {
+ sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
+ }
+ switch (sortScope.toUpperCase()) {
+ case "BATCH_SORT":
+ return SortScope.BATCH_SORT;
+ case "LOCAL_SORT":
+ return SortScope.LOCAL_SORT;
+ case "GLOBAL_SORT":
+ return SortScope.GLOBAL_SORT;
+ case "NO_SORT":
+ return SortScope.NO_SORT;
+ default:
+ return SortScope.LOCAL_SORT;
+ }
+ }
+
+ public static boolean isValidSortOption(String sortScope) {
+ return CarbonUtil.isValidSortOption(sortScope);
+ }
+
+ public enum SortScope {
+ NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..9665487
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class SortStepRowUtil {
+ public static Object[] convertRow(Object[] data, SortParameters parameters) {
+ int measureCount = parameters.getMeasureColCount();
+ int dimensionCount = parameters.getDimColCount();
+ int complexDimensionCount = parameters.getComplexDimColCount();
+ int noDictionaryCount = parameters.getNoDictionaryCount();
+ boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+
+ // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+ Object[] holder = new Object[3];
+ int index = 0;
+ int nonDicIndex = 0;
+ int allCount = 0;
+ int[] dim = new int[dimensionCount];
+ byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
+ Object[] measures = new Object[measureCount];
+ try {
+ // read dimension values
+ for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+ if (isNoDictionaryDimensionColumn[i]) {
+ nonDicArray[nonDicIndex++] = (byte[]) data[i];
+ } else {
+ dim[index++] = (int) data[allCount];
+ }
+ allCount++;
+ }
+
+ for (int i = 0; i < complexDimensionCount; i++) {
+ nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
+ allCount++;
+ }
+
+ index = 0;
+
+ // read measure values
+ for (int i = 0; i < measureCount; i++) {
+ measures[index++] = data[allCount];
+ allCount++;
+ }
+
+ NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+ // increment number if record read
+ } catch (Exception e) {
+ throw new RuntimeException("Problem while converting row ", e);
+ }
+
+ //return out row
+ return holder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
new file mode 100644
index 0000000..9a47e50
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * This interface sorts all the data of iterators.
+ * The life cycle of this interface is initialize -> sort -> close
+ */
+public interface Sorter {
+
+ /**
+ * Initialize sorter with sort parameters.
+ *
+ * @param sortParameters
+ */
+ void initialize(SortParameters sortParameters);
+
+ /**
+ * Sorts the data of all iterators, this iterators can be
+ * read parallely depends on implementation.
+ *
+ * @param iterators array of iterators to read data.
+ * @return
+ * @throws CarbonDataLoadingException
+ */
+ Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException;
+
+ /**
+ * Close resources
+ */
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
new file mode 100644
index 0000000..a8f0282
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SorterFactory {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SorterFactory.class.getName());
+
+ public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
+ boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
+ SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+ Sorter sorter;
+ if (offheapsort) {
+ if (configuration.getBucketingInfo() != null) {
+ sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+ configuration.getBucketingInfo());
+ } else {
+ sorter = new UnsafeParallelReadMergeSorterImpl(counter);
+ }
+ } else {
+ if (configuration.getBucketingInfo() != null) {
+ sorter =
+ new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
+ } else {
+ sorter = new ParallelReadMergeSorterImpl(counter);
+ }
+ }
+ if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+ if (configuration.getBucketingInfo() == null) {
+ sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
+ } else {
+ LOGGER.warn(
+ "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
+ .getName());
+ }
+ }
+ return sorter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..6e43fcb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ */
+public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private SortIntermediateFileMerger intermediateFileMerger;
+
+ private SingleThreadFinalSortFilesMerger finalMerger;
+
+ private AtomicLong rowCounter;
+
+ public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+ this.rowCounter = rowCounter;
+ }
+
+ @Override
+ public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+ String[] storeLocations =
+ CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
+ sortParameters.getSegmentId() + "", false, false);
+ // Set the data file location
+ String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ finalMerger =
+ new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
+ sortParameters.getDimColCount(),
+ sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
+ sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
+ sortParameters.getNoDictionaryDimnesionColumn(),
+ sortParameters.getNoDictionarySortColumn());
+ }
+
+ @Override
+ public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ sortDataRow.initialize();
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(executorService);
+
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(
+ new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+ threadStatusObserver));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRow, sortParameters);
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ try {
+ intermediateFileMerger.finish();
+ intermediateFileMerger = null;
+ finalMerger.startFinalMerge();
+ } catch (CarbonDataWriterException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ // Creates the iterator to read from merge sorter.
+ Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
+
+ @Override
+ public boolean hasNext() {
+ return finalMerger.hasNext();
+ }
+
+ @Override
+ public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ };
+ return new Iterator[] { batchIterator };
+ }
+
+ @Override public void close() {
+ if (intermediateFileMerger != null) {
+ intermediateFileMerger.close();
+ }
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ // start sorting
+ sortDataRows.startSorting();
+
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+ System.currentTimeMillis());
+ return false;
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private SortDataRows sortDataRows;
+
+ private Object[][] buffer;
+
+ private AtomicLong rowCounter;
+
+ private ThreadStatusObserver observer;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
+ int batchSize, AtomicLong rowCounter, ThreadStatusObserver observer) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.buffer = new Object[batchSize][];
+ this.rowCounter = rowCounter;
+ this.observer = observer;
+
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ buffer[i++] = row.getData();
+ }
+ }
+ if (i > 0) {
+ sortDataRows.addRowBatch(buffer, i);
+ rowCounter.getAndAdd(i);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ observer.notifyFailed(e);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..51db3a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private SortIntermediateFileMerger[] intermediateFileMergers;
+
+ private BucketingInfo bucketingInfo;
+
+ private int sortBufferSize;
+
+ private AtomicLong rowCounter;
+
+ public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
+ BucketingInfo bucketingInfo) {
+ this.rowCounter = rowCounter;
+ this.bucketingInfo = bucketingInfo;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ int buffer = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+ sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
+ if (sortBufferSize < 100) {
+ sortBufferSize = 100;
+ }
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+ intermediateFileMergers =
+ new SortIntermediateFileMerger[sortDataRows.length];
+ try {
+ for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+ SortParameters parameters = sortParameters.getCopy();
+ parameters.setPartitionID(i + "");
+ setTempLocation(parameters);
+ parameters.setBufferSize(sortBufferSize);
+ intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
+ sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
+ sortDataRows[i].initialize();
+ }
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(executorService);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+ this.threadStatusObserver));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRows, sortParameters);
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ try {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].finish();
+ }
+ } catch (CarbonDataWriterException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+ for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+ batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
+ }
+
+ return batchIterator;
+ }
+
+ private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), bucketId,
+ sortParameters.getSegmentId() + "", false, false);
+ // Set the data file location
+ String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
+ CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
+ sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
+ sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
+ sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
+ this.sortParameters.getNoDictionarySortColumn());
+ }
+
+ @Override public void close() {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].close();
+ }
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows || sortDataRows.length == 0) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ for (int i = 0; i < sortDataRows.length; i++) {
+ // start sorting
+ sortDataRows[i].startSorting();
+ }
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(),
+ parameters.getTableName(), parameters.getTaskNo(),
+ parameters.getPartitionID(), parameters.getSegmentId(), false, false);
+ String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+ CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ parameters.setTempFileLocation(tmpLocs);
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private SortDataRows[] sortDataRows;
+
+ private AtomicLong rowCounter;
+
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
+ AtomicLong rowCounter, ThreadStatusObserver observer) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.rowCounter = rowCounter;
+ this.threadStatusObserver = observer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+ synchronized (sortDataRow) {
+ sortDataRow.addRow(row.getData());
+ rowCounter.getAndAdd(1);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
+ }
+ }
+
+ }
+
+ private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private String partitionId;
+
+ private int batchSize;
+
+ private boolean firstRow = true;
+
+ public MergedDataIterator(String partitionId, int batchSize) {
+ this.partitionId = partitionId;
+ this.batchSize = batchSize;
+ }
+
+ private SingleThreadFinalSortFilesMerger finalMerger;
+
+ @Override public boolean hasNext() {
+ if (firstRow) {
+ firstRow = false;
+ finalMerger = getFinalMerger(partitionId);
+ finalMerger.startFinalMerge();
+ }
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
new file mode 100644
index 0000000..ed35a96
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.util.concurrent.ExecutorService;
+
+public class ThreadStatusObserver {
+
+ /**
+ * lock object
+ */
+ private Object lock = new Object();
+
+ private ExecutorService executorService;
+
+ private Throwable throwable;
+
+ public ThreadStatusObserver(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public void notifyFailed(Throwable throwable) {
+ // Only the first failing thread should call for shutting down the executor service and
+ // should assign the throwable object else the actual cause for failure can be overridden as
+ // all the running threads will throw interrupted exception on calling shutdownNow and
+ // will override the throwable object
+ if (null == this.throwable) {
+ synchronized (lock) {
+ if (null == this.throwable) {
+ executorService.shutdownNow();
+ this.throwable = throwable;
+ }
+ }
+ }
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..c5579d9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.row.CarbonSortBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * It sorts data in batches and send to the next step.
+ */
+public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private ExecutorService executorService;
+
+ private AtomicLong rowCounter;
+
+ public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+ this.rowCounter = rowCounter;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ this.executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+ final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
+ this.threadStatusObserver);
+
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(
+ new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
+ this.threadStatusObserver));
+ }
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ // Creates the iterator to read from merge sorter.
+ Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
+
+ @Override public boolean hasNext() {
+ return sortBatchHolder.hasNext();
+ }
+
+ @Override public CarbonSortBatch next() {
+ return new CarbonSortBatch(sortBatchHolder.next());
+ }
+ };
+ return new Iterator[] { batchIterator };
+ }
+
+ @Override public void close() {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ }
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private SortBatchHolder sortDataRows;
+
+ private Object[][] buffer;
+
+ private AtomicLong rowCounter;
+
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
+ int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.buffer = new Object[batchSize][];
+ this.rowCounter = rowCounter;
+ this.threadStatusObserver = threadStatusObserver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ buffer[i++] = row.getData();
+ }
+ }
+ if (i > 0) {
+ synchronized (sortDataRows) {
+ sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
+ rowCounter.getAndAdd(i);
+ if (!sortDataRows.getSortDataRow().canAdd()) {
+ sortDataRows.finish(false);
+ sortDataRows.createSortDataRows();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
+ } finally {
+ sortDataRows.finishThread();
+ }
+ }
+
+ }
+
+ private static class SortBatchHolder
+ extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
+
+ private SortParameters sortParameters;
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+ private UnsafeSortDataRows sortDataRow;
+
+ private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
+
+ private AtomicInteger iteratorCount;
+
+ private int batchCount;
+
+ private ThreadStatusObserver threadStatusObserver;
+
+ private final Object lock = new Object();
+
+ public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
+ ThreadStatusObserver threadStatusObserver) {
+ this.sortParameters = sortParameters.getCopy();
+ this.iteratorCount = new AtomicInteger(numberOfThreads);
+ this.mergerQueue = new LinkedBlockingQueue<>(1);
+ this.threadStatusObserver = threadStatusObserver;
+ createSortDataRows();
+ }
+
+ private void createSortDataRows() {
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ setTempLocation(sortParameters);
+ this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+ sortParameters.getTempFileLocation());
+ unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+ sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
+ inMemoryChunkSizeInMB);
+
+ try {
+ sortDataRow.initialize();
+ } catch (MemoryException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ batchCount++;
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(),
+ parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
+ parameters.getSegmentId(), false, false);
+ String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ parameters.setTempFileLocation(tempDirs);
+ }
+
+ @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
+ try {
+ UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
+ mergerQueue.take();
+ if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
+ throw new RuntimeException(threadStatusObserver.getThrowable());
+ }
+ return unsafeSingleThreadFinalSortFilesMerger;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public UnsafeSortDataRows getSortDataRow() {
+ return sortDataRow;
+ }
+
+ public void finish(boolean isFinalAttempt) {
+ try {
+ // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
+ // then set stop process to true in the finalmerger instance
+ if (mergerQueue.isEmpty() && threadStatusObserver != null
+ && threadStatusObserver.getThrowable() != null && threadStatusObserver
+ .getThrowable() instanceof CarbonDataLoadingException) {
+ finalMerger.setStopProcess(true);
+ if (isFinalAttempt) {
+ iteratorCount.decrementAndGet();
+ }
+ mergerQueue.put(finalMerger);
+ return;
+ }
+ processRowToNextStep(sortDataRow, sortParameters);
+ unsafeIntermediateFileMerger.finish();
+ List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ unsafeIntermediateFileMerger.getMergedPages());
+ unsafeIntermediateFileMerger.close();
+ if (isFinalAttempt) {
+ iteratorCount.decrementAndGet();
+ }
+ mergerQueue.put(finalMerger);
+ sortDataRow = null;
+ unsafeIntermediateFileMerger = null;
+ finalMerger = null;
+ } catch (CarbonDataWriterException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (InterruptedException e) {
+ // if fails to put in queue because of interrupted exception, we can offer to free the main
+ // thread from waiting.
+ if (finalMerger != null) {
+ finalMerger.setStopProcess(true);
+ boolean offered = mergerQueue.offer(finalMerger);
+ if (!offered) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ public void finishThread() {
+ synchronized (lock) {
+ if (iteratorCount.get() <= 1) {
+ finish(true);
+ } else {
+ iteratorCount.decrementAndGet();
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ // start sorting
+ sortDataRows.startSorting();
+
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+ System.currentTimeMillis());
+ return false;
+ } catch (InterruptedException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..1a2f704
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ */
+public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ private AtomicLong rowCounter;
+
+ public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+ this.rowCounter = rowCounter;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+
+ finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+ sortParameters.getTempFileLocation());
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ UnsafeSortDataRows sortDataRow =
+ new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ sortDataRow.initialize();
+ } catch (MemoryException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(executorService);
+
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(
+ new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+ this.threadStatusObserver));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRow, sortParameters);
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ try {
+ unsafeIntermediateFileMerger.finish();
+ List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ unsafeIntermediateFileMerger.getMergedPages());
+ } catch (CarbonDataWriterException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ // Creates the iterator to read from merge sorter.
+ Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
+
+ @Override public boolean hasNext() {
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ };
+ return new Iterator[] { batchIterator };
+ }
+
+ @Override public void close() {
+ unsafeIntermediateFileMerger.close();
+ finalMerger.clear();
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ // start sorting
+ sortDataRows.startSorting();
+
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (InterruptedException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private UnsafeSortDataRows sortDataRows;
+
+ private Object[][] buffer;
+
+ private AtomicLong rowCounter;
+
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+ UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
+ ThreadStatusObserver threadStatusObserver) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.buffer = new Object[batchSize][];
+ this.rowCounter = rowCounter;
+ this.threadStatusObserver = threadStatusObserver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ buffer[i++] = row.getData();
+ }
+ }
+ if (i > 0) {
+ sortDataRows.addRowBatch(buffer, i);
+ rowCounter.getAndAdd(i);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..3c48e4d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(
+ UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private BucketingInfo bucketingInfo;
+
+ public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+ BucketingInfo bucketingInfo) {
+ this.bucketingInfo = bucketingInfo;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
+ UnsafeIntermediateMerger[] intermediateFileMergers =
+ new UnsafeIntermediateMerger[sortDataRows.length];
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
+ if (inMemoryChunkSizeInMB < 5) {
+ inMemoryChunkSizeInMB = 5;
+ }
+ try {
+ for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+ SortParameters parameters = sortParameters.getCopy();
+ parameters.setPartitionID(i + "");
+ setTempLocation(parameters);
+ intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+ sortDataRows[i] =
+ new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+ sortDataRows[i].initialize();
+ }
+ } catch (MemoryException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(executorService);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
+ .threadStatusObserver));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRows, sortParameters);
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ try {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].finish();
+ }
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+ for (int i = 0; i < sortDataRows.length; i++) {
+ batchIterator[i] =
+ new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+ }
+
+ return batchIterator;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), bucketId,
+ sortParameters.getSegmentId() + "", false, false);
+ // Set the data file location
+ String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+ }
+
+ @Override public void close() {
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows || sortDataRows.length == 0) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ for (int i = 0; i < sortDataRows.length; i++) {
+ // start sorting
+ sortDataRows[i].startSorting();
+ }
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+ parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+ false, false);
+ String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+ CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ parameters.setTempFileLocation(tmpLoc);
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private UnsafeSortDataRows[] sortDataRows;
+
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+ UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.threadStatusObserver = threadStatusObserver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+ synchronized (sortDataRow) {
+ sortDataRow.addRow(row.getData());
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
+ }
+ }
+
+ }
+
+ private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private String partitionId;
+
+ private int batchSize;
+
+ private boolean firstRow;
+
+ private UnsafeIntermediateMerger intermediateMerger;
+
+ public MergedDataIterator(String partitionId, int batchSize,
+ UnsafeIntermediateMerger intermediateMerger) {
+ this.partitionId = partitionId;
+ this.batchSize = batchSize;
+ this.intermediateMerger = intermediateMerger;
+ this.firstRow = true;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ @Override public boolean hasNext() {
+ if (firstRow) {
+ firstRow = false;
+ finalMerger = getFinalMerger(partitionId);
+ List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ intermediateMerger.getMergedPages());
+ }
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ }
+}