You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:17 UTC
[30/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
deleted file mode 100644
index 56a32a3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.concurrent.ExecutorService;
-
-public class ThreadStatusObserver {
-
- /**
- * lock object
- */
- private Object lock = new Object();
-
- private ExecutorService executorService;
-
- private Throwable throwable;
-
- public ThreadStatusObserver(ExecutorService executorService) {
- this.executorService = executorService;
- }
-
- public void notifyFailed(Throwable throwable) {
- // Only the first failing thread should call for shutting down the executor service and
- // should assign the throwable object else the actual cause for failure can be overridden as
- // all the running threads will throw interrupted exception on calling shutdownNow and
- // will override the throwable object
- if (null == this.throwable) {
- synchronized (lock) {
- if (null == this.throwable) {
- executorService.shutdownNow();
- this.throwable = throwable;
- }
- }
- }
- }
-
- public Throwable getThrowable() {
- return throwable;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
deleted file mode 100644
index 056c96b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * It sorts data in batches and send to the next step.
- */
-public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private ExecutorService executorService;
-
- private AtomicLong rowCounter;
-
- public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
- this.rowCounter = rowCounter;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
-
- }
-
- @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- this.executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
- int batchSize = CarbonProperties.getInstance().getBatchSize();
- final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
- this.threadStatusObserver);
-
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(
- new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
- this.threadStatusObserver));
- }
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- // Creates the iterator to read from merge sorter.
- Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
-
- @Override public boolean hasNext() {
- return sortBatchHolder.hasNext();
- }
-
- @Override public CarbonSortBatch next() {
- return new CarbonSortBatch(sortBatchHolder.next());
- }
- };
- return new Iterator[] { batchIterator };
- }
-
- @Override public void close() {
- executorService.shutdown();
- try {
- executorService.awaitTermination(2, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- LOGGER.error(e);
- }
- }
-
- /**
- * This thread iterates the iterator and adds the rows
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private SortBatchHolder sortDataRows;
-
- private Object[][] buffer;
-
- private AtomicLong rowCounter;
-
- private ThreadStatusObserver threadStatusObserver;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
- int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.buffer = new Object[batchSize][];
- this.rowCounter = rowCounter;
- this.threadStatusObserver = threadStatusObserver;
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- buffer[i++] = row.getData();
- }
- }
- if (i > 0) {
- synchronized (sortDataRows) {
- sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
- rowCounter.getAndAdd(i);
- if (!sortDataRows.getSortDataRow().canAdd()) {
- sortDataRows.finish(false);
- sortDataRows.createSortDataRows();
- }
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- this.threadStatusObserver.notifyFailed(e);
- } finally {
- sortDataRows.finishThread();
- }
- }
-
- }
-
- private static class SortBatchHolder
- extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
-
- private SortParameters sortParameters;
-
- private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
- private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
-
- private UnsafeSortDataRows sortDataRow;
-
- private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
-
- private AtomicInteger iteratorCount;
-
- private int batchCount;
-
- private ThreadStatusObserver threadStatusObserver;
-
- private final Object lock = new Object();
-
- public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
- ThreadStatusObserver threadStatusObserver) {
- this.sortParameters = sortParameters.getCopy();
- this.iteratorCount = new AtomicInteger(numberOfThreads);
- this.mergerQueue = new LinkedBlockingQueue<>(1);
- this.threadStatusObserver = threadStatusObserver;
- createSortDataRows();
- }
-
- private void createSortDataRows() {
- int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- setTempLocation(sortParameters);
- this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
- sortParameters.getTempFileLocation());
- unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
- sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
- inMemoryChunkSizeInMB);
-
- try {
- sortDataRow.initialize();
- } catch (MemoryException e) {
- throw new CarbonDataLoadingException(e);
- }
- batchCount++;
- }
-
- private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(),
- parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
- parameters.getSegmentId(), false, false);
- String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- parameters.setTempFileLocation(tempDirs);
- }
-
- @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
- try {
- UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
- mergerQueue.take();
- if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
- throw new RuntimeException(threadStatusObserver.getThrowable());
- }
- return unsafeSingleThreadFinalSortFilesMerger;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- public UnsafeSortDataRows getSortDataRow() {
- return sortDataRow;
- }
-
- public void finish(boolean isFinalAttempt) {
- try {
- // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
- // then set stop process to true in the finalmerger instance
- if (mergerQueue.isEmpty() && threadStatusObserver != null
- && threadStatusObserver.getThrowable() != null && threadStatusObserver
- .getThrowable() instanceof CarbonDataLoadingException) {
- finalMerger.setStopProcess(true);
- if (isFinalAttempt) {
- iteratorCount.decrementAndGet();
- }
- mergerQueue.put(finalMerger);
- return;
- }
- processRowToNextStep(sortDataRow, sortParameters);
- unsafeIntermediateFileMerger.finish();
- List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
- finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
- unsafeIntermediateFileMerger.getMergedPages());
- unsafeIntermediateFileMerger.close();
- if (isFinalAttempt) {
- iteratorCount.decrementAndGet();
- }
- mergerQueue.put(finalMerger);
- sortDataRow = null;
- unsafeIntermediateFileMerger = null;
- finalMerger = null;
- } catch (CarbonDataWriterException e) {
- throw new CarbonDataLoadingException(e);
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- } catch (InterruptedException e) {
- // if fails to put in queue because of interrupted exception, we can offer to free the main
- // thread from waiting.
- if (finalMerger != null) {
- finalMerger.setStopProcess(true);
- boolean offered = mergerQueue.offer(finalMerger);
- if (!offered) {
- throw new CarbonDataLoadingException(e);
- }
- }
- throw new CarbonDataLoadingException(e);
- }
- }
-
- public void finishThread() {
- synchronized (lock) {
- if (iteratorCount.get() <= 1) {
- finish(true);
- } else {
- iteratorCount.decrementAndGet();
- }
- }
- }
-
- public boolean hasNext() {
- return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- // start sorting
- sortDataRows.startSorting();
-
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
- System.currentTimeMillis());
- return false;
- } catch (InterruptedException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
deleted file mode 100644
index a0d43ba..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- */
-public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
-
- private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
- private AtomicLong rowCounter;
-
- public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
- this.rowCounter = rowCounter;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
- unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
-
- finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
- sortParameters.getTempFileLocation());
- }
-
- @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- UnsafeSortDataRows sortDataRow =
- new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
- final int batchSize = CarbonProperties.getInstance().getBatchSize();
- try {
- sortDataRow.initialize();
- } catch (MemoryException e) {
- throw new CarbonDataLoadingException(e);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(executorService);
-
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(
- new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
- this.threadStatusObserver));
- }
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- processRowToNextStep(sortDataRow, sortParameters);
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- try {
- unsafeIntermediateFileMerger.finish();
- List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
- finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
- unsafeIntermediateFileMerger.getMergedPages());
- } catch (CarbonDataWriterException e) {
- throw new CarbonDataLoadingException(e);
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
-
- // Creates the iterator to read from merge sorter.
- Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
-
- @Override public boolean hasNext() {
- return finalMerger.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
- while (finalMerger.hasNext() && counter < batchSize) {
- rowBatch.addRow(new CarbonRow(finalMerger.next()));
- counter++;
- }
- return rowBatch;
- }
- };
- return new Iterator[] { batchIterator };
- }
-
- @Override public void close() {
- unsafeIntermediateFileMerger.close();
- finalMerger.clear();
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- // start sorting
- sortDataRows.startSorting();
-
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- return false;
- } catch (InterruptedException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- /**
- * This thread iterates the iterator and adds the rows
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private UnsafeSortDataRows sortDataRows;
-
- private Object[][] buffer;
-
- private AtomicLong rowCounter;
-
- private ThreadStatusObserver threadStatusObserver;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
- UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
- ThreadStatusObserver threadStatusObserver) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.buffer = new Object[batchSize][];
- this.rowCounter = rowCounter;
- this.threadStatusObserver = threadStatusObserver;
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- buffer[i++] = row.getData();
- }
- }
- if (i > 0) {
- sortDataRows.addRowBatch(buffer, i);
- rowCounter.getAndAdd(i);
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- this.threadStatusObserver.notifyFailed(e);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index 54e0180..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(
- UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private BucketingInfo bucketingInfo;
-
- public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
- BucketingInfo bucketingInfo) {
- this.bucketingInfo = bucketingInfo;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
- }
-
- @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
- UnsafeIntermediateMerger[] intermediateFileMergers =
- new UnsafeIntermediateMerger[sortDataRows.length];
- int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
- if (inMemoryChunkSizeInMB < 5) {
- inMemoryChunkSizeInMB = 5;
- }
- try {
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
- SortParameters parameters = sortParameters.getCopy();
- parameters.setPartitionID(i + "");
- setTempLocation(parameters);
- intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
- sortDataRows[i] =
- new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
- sortDataRows[i].initialize();
- }
- } catch (MemoryException e) {
- throw new CarbonDataLoadingException(e);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(executorService);
- final int batchSize = CarbonProperties.getInstance().getBatchSize();
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
- .threadStatusObserver));
- }
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- processRowToNextStep(sortDataRows, sortParameters);
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- try {
- for (int i = 0; i < intermediateFileMergers.length; i++) {
- intermediateFileMergers[i].finish();
- }
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
-
- Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
- for (int i = 0; i < sortDataRows.length; i++) {
- batchIterator[i] =
- new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
- }
-
- return batchIterator;
- }
-
- private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), bucketId,
- sortParameters.getSegmentId() + "", false, false);
- // Set the data file location
- String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
- }
-
- @Override public void close() {
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows || sortDataRows.length == 0) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- for (int i = 0; i < sortDataRows.length; i++) {
- // start sorting
- sortDataRows[i].startSorting();
- }
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- return false;
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
- parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
- false, false);
- String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
- CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- parameters.setTempFileLocation(tmpLoc);
- }
-
- /**
- * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private UnsafeSortDataRows[] sortDataRows;
-
- private ThreadStatusObserver threadStatusObserver;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
- UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.threadStatusObserver = threadStatusObserver;
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
- synchronized (sortDataRow) {
- sortDataRow.addRow(row.getData());
- }
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- this.threadStatusObserver.notifyFailed(e);
- }
- }
-
- }
-
- private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
- private String partitionId;
-
- private int batchSize;
-
- private boolean firstRow;
-
- private UnsafeIntermediateMerger intermediateMerger;
-
- public MergedDataIterator(String partitionId, int batchSize,
- UnsafeIntermediateMerger intermediateMerger) {
- this.partitionId = partitionId;
- this.batchSize = batchSize;
- this.intermediateMerger = intermediateMerger;
- this.firstRow = true;
- }
-
- private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
- @Override public boolean hasNext() {
- if (firstRow) {
- firstRow = false;
- finalMerger = getFinalMerger(partitionId);
- List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
- finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
- intermediateMerger.getMergedPages());
- }
- return finalMerger.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
- while (finalMerger.hasNext() && counter < batchSize) {
- rowBatch.addRow(new CarbonRow(finalMerger.next()));
- counter++;
- }
- return rowBatch;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
deleted file mode 100644
index 8b23437..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.IntPointerBuffer;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
- */
-public class UnsafeCarbonRowPage {
-
- private boolean[] noDictionaryDimensionMapping;
-
- private boolean[] noDictionarySortColumnMapping;
-
- private int dimensionSize;
-
- private int measureSize;
-
- private DataType[] measureDataType;
-
- private long[] nullSetWords;
-
- private IntPointerBuffer buffer;
-
- private int lastSize;
-
- private long sizeToBeUsed;
-
- private MemoryBlock dataBlock;
-
- private boolean saveToDisk;
-
- private MemoryManagerType managerType;
-
- private long taskId;
-
- public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
- boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
- MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
- this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
- this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
- this.dimensionSize = dimensionSize;
- this.measureSize = measureSize;
- this.measureDataType = type;
- this.saveToDisk = saveToDisk;
- this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
- this.taskId = taskId;
- buffer = new IntPointerBuffer(this.taskId);
- this.dataBlock = memoryBlock;
- // TODO Only using 98% of space for safe side.May be we can have different logic.
- sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
- this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
- }
-
- public int addRow(Object[] row) {
- int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
- buffer.set(lastSize);
- lastSize = lastSize + size;
- return size;
- }
-
- private int addRow(Object[] row, long address) {
- if (row == null) {
- throw new RuntimeException("Row is null ??");
- }
- int dimCount = 0;
- int size = 0;
- Object baseObject = dataBlock.getBaseObject();
- for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
- if (noDictionaryDimensionMapping[dimCount]) {
- byte[] col = (byte[]) row[dimCount];
- CarbonUnsafe.getUnsafe()
- .putShort(baseObject, address + size, (short) col.length);
- size += 2;
- CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
- address + size, col.length);
- size += col.length;
- } else {
- int value = (int) row[dimCount];
- CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
- size += 4;
- }
- }
-
- // write complex dimensions here.
- for (; dimCount < dimensionSize; dimCount++) {
- byte[] col = (byte[]) row[dimCount];
- CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
- size += 2;
- CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
- address + size, col.length);
- size += col.length;
- }
- Arrays.fill(nullSetWords, 0);
- int nullSetSize = nullSetWords.length * 8;
- int nullWordLoc = size;
- size += nullSetSize;
- for (int mesCount = 0; mesCount < measureSize; mesCount++) {
- Object value = row[mesCount + dimensionSize];
- if (null != value) {
- switch (measureDataType[mesCount]) {
- case SHORT:
- Short sval = (Short) value;
- CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
- size += 2;
- break;
- case INT:
- Integer ival = (Integer) value;
- CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
- size += 4;
- break;
- case LONG:
- Long val = (Long) value;
- CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
- size += 8;
- break;
- case DOUBLE:
- Double doubleVal = (Double) value;
- CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
- size += 8;
- break;
- case DECIMAL:
- BigDecimal decimalVal = (BigDecimal) value;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
- CarbonUnsafe.getUnsafe().putShort(baseObject, address + size,
- (short) bigDecimalInBytes.length);
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
- address + size, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" +
- measureDataType[mesCount]);
- }
- set(nullSetWords, mesCount);
- } else {
- unset(nullSetWords, mesCount);
- }
- }
- CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
- address + nullWordLoc, nullSetSize);
- return size;
- }
-
- public Object[] getRow(long address, Object[] rowToFill) {
- int dimCount = 0;
- int size = 0;
-
- Object baseObject = dataBlock.getBaseObject();
- for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
- if (noDictionaryDimensionMapping[dimCount]) {
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] col = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
- col.length);
- size += col.length;
- rowToFill[dimCount] = col;
- } else {
- int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- rowToFill[dimCount] = anInt;
- }
- }
-
- // write complex dimensions here.
- for (; dimCount < dimensionSize; dimCount++) {
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] col = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
- size += col.length;
- rowToFill[dimCount] = col;
- }
-
- int nullSetSize = nullSetWords.length * 8;
- Arrays.fill(nullSetWords, 0);
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
- nullSetSize);
- size += nullSetSize;
-
- for (int mesCount = 0; mesCount < measureSize; mesCount++) {
- if (isSet(nullSetWords, mesCount)) {
- switch (measureDataType[mesCount]) {
- case SHORT:
- Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- size += 2;
- rowToFill[dimensionSize + mesCount] = sval;
- break;
- case INT:
- Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- rowToFill[dimensionSize + mesCount] = ival;
- break;
- case LONG:
- Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
- size += 8;
- rowToFill[dimensionSize + mesCount] = val;
- break;
- case DOUBLE:
- Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
- size += 8;
- rowToFill[dimensionSize + mesCount] = doubleVal;
- break;
- case DECIMAL:
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] bigDecimalInBytes = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
- rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" +
- measureDataType[mesCount]);
- }
- } else {
- rowToFill[dimensionSize + mesCount] = null;
- }
- }
- return rowToFill;
- }
-
- public void fillRow(long address, DataOutputStream stream) throws IOException {
- int dimCount = 0;
- int size = 0;
-
- Object baseObject = dataBlock.getBaseObject();
- for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
- if (noDictionaryDimensionMapping[dimCount]) {
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] col = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
- col.length);
- size += col.length;
- stream.writeShort(aShort);
- stream.write(col);
- } else {
- int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- stream.writeInt(anInt);
- }
- }
-
- // write complex dimensions here.
- for (; dimCount < dimensionSize; dimCount++) {
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] col = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
- size += col.length;
- stream.writeShort(aShort);
- stream.write(col);
- }
-
- int nullSetSize = nullSetWords.length * 8;
- Arrays.fill(nullSetWords, 0);
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
- nullSetSize);
- size += nullSetSize;
- for (int i = 0; i < nullSetWords.length; i++) {
- stream.writeLong(nullSetWords[i]);
- }
-
- for (int mesCount = 0; mesCount < measureSize; mesCount++) {
- if (isSet(nullSetWords, mesCount)) {
- switch (measureDataType[mesCount]) {
- case SHORT:
- short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- size += 2;
- stream.writeShort(sval);
- break;
- case INT:
- int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- stream.writeInt(ival);
- break;
- case LONG:
- long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
- size += 8;
- stream.writeLong(val);
- break;
- case DOUBLE:
- double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
- size += 8;
- stream.writeDouble(doubleVal);
- break;
- case DECIMAL:
- short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- byte[] bigDecimalInBytes = new byte[aShort];
- size += 2;
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
- stream.writeShort(aShort);
- stream.write(bigDecimalInBytes);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" +
- measureDataType[mesCount]);
- }
- }
- }
- }
-
- public void freeMemory() {
- switch (managerType) {
- case UNSAFE_MEMORY_MANAGER:
- UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
- break;
- default:
- UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
- buffer.freeMemory();
- }
- }
-
- public boolean isSaveToDisk() {
- return saveToDisk;
- }
-
- public IntPointerBuffer getBuffer() {
- return buffer;
- }
-
- public int getUsedSize() {
- return lastSize;
- }
-
- public boolean canAdd() {
- return lastSize < sizeToBeUsed;
- }
-
- public MemoryBlock getDataBlock() {
- return dataBlock;
- }
-
- public static void set(long[] words, int index) {
- int wordOffset = (index >> 6);
- words[wordOffset] |= (1L << index);
- }
-
- public static void unset(long[] words, int index) {
- int wordOffset = (index >> 6);
- words[wordOffset] &= ~(1L << index);
- }
-
- public static boolean isSet(long[] words, int index) {
- int wordOffset = (index >> 6);
- return ((words[wordOffset] & (1L << index)) != 0);
- }
-
- public boolean[] getNoDictionaryDimensionMapping() {
- return noDictionaryDimensionMapping;
- }
-
- public boolean[] getNoDictionarySortColumnMapping() {
- return noDictionarySortColumnMapping;
- }
-
- public void setNewDataBlock(MemoryBlock newMemoryBlock) {
- this.dataBlock = newMemoryBlock;
- this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
- }
-
- public enum MemoryManagerType {
- UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
deleted file mode 100644
index dda0d89..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.IntPointerBuffer;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.sort.TimSort;
-import org.apache.carbondata.processing.newflow.sort.unsafe.sort.UnsafeIntSortDataFormat;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class UnsafeSortDataRows {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
- /**
- * threadStatusObserver
- */
- private ThreadStatusObserver threadStatusObserver;
- /**
- * executor service for data sort holder
- */
- private ExecutorService dataSorterAndWriterExecutorService;
- /**
- * semaphore which will used for managing sorted data object arrays
- */
-
- private SortParameters parameters;
-
- private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
-
- private UnsafeCarbonRowPage rowPage;
-
- private final Object addRowsLock = new Object();
-
- private long inMemoryChunkSize;
-
- private boolean enableInMemoryIntermediateMerge;
-
- private int bytesAdded;
-
- private long maxSizeAllowed;
-
- /**
- * semaphore which will used for managing sorted data object arrays
- */
- private Semaphore semaphore;
-
- private final long taskId;
-
- public UnsafeSortDataRows(SortParameters parameters,
- UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
- this.parameters = parameters;
-
- this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
-
- // observer of writing file in thread
- this.threadStatusObserver = new ThreadStatusObserver();
- this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
- this.inMemoryChunkSize = inMemoryChunkSize;
- this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
- enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
- CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
-
- this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
- if (maxSizeAllowed <= 0) {
- // If user does not input any memory size, then take half the size of usable memory configured
- // in sort memory size.
- this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
- } else {
- this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
- }
- }
-
- /**
- * This method will be used to initialize
- */
- public void initialize() throws MemoryException {
- MemoryBlock baseBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean isMemoryAvailable =
- UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
- if (isMemoryAvailable) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
- }
- this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount() + parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
- !isMemoryAvailable, taskId);
- // Delete if any older file exists in sort temp folder
- deleteSortLocationIfExists();
-
- // create new sort temp directory
- CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
- this.dataSorterAndWriterExecutorService =
- Executors.newFixedThreadPool(parameters.getNumberOfCores());
- semaphore = new Semaphore(parameters.getNumberOfCores());
- }
-
- public boolean canAdd() {
- return bytesAdded < maxSizeAllowed;
- }
-
- /**
- * This method will be used to add new row
- *
- * @param rowBatch new rowBatch
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
- // if record holder list size is equal to sort buffer size then it will
- // sort the list and then write current list data to file
- synchronized (addRowsLock) {
- addBatch(rowBatch, size);
- }
- }
-
- /**
- * This method will be used to add new row
- *
- * @param rowBatch new rowBatch
- * @param size
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
- throws CarbonSortKeyAndGroupByException {
- // if record holder list size is equal to sort buffer size then it will
- // sort the list and then write current list data to file
- addBatch(rowBatch, size);
- }
-
- private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
- for (int i = 0; i < size; i++) {
- if (rowPage.canAdd()) {
- bytesAdded += rowPage.addRow(rowBatch[i]);
- } else {
- try {
- if (enableInMemoryIntermediateMerge) {
- unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
- }
- unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
- semaphore.acquire();
- dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean saveToDisk =
- UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
- if (!saveToDisk) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
- }
- rowPage = new UnsafeCarbonRowPage(
- parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount() + parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(),
- parameters.getMeasureDataType(),
- memoryBlock,
- saveToDisk, taskId);
- bytesAdded += rowPage.addRow(rowBatch[i]);
- } catch (Exception e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e);
- }
-
- }
- }
- }
-
- /**
- * This method will be used to add new row
- */
- public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
- // if record holder list size is equal to sort buffer size then it will
- // sort the list and then write current list data to file
- if (rowPage.canAdd()) {
- rowPage.addRow(row);
- } else {
- try {
- if (enableInMemoryIntermediateMerge) {
- unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
- }
- unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
- semaphore.acquire();
- dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
- if (!saveToDisk) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
- }
- rowPage = new UnsafeCarbonRowPage(
- parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount(), parameters.getMeasureColCount(),
- parameters.getMeasureDataType(), memoryBlock,
- saveToDisk, taskId);
- rowPage.addRow(row);
- } catch (Exception e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e);
- }
-
- }
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws InterruptedException
- */
- public void startSorting() throws InterruptedException {
- LOGGER.info("Unsafe based sorting will be used");
- if (this.rowPage.getUsedSize() > 0) {
- TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
- new UnsafeIntSortDataFormat(rowPage));
- if (parameters.getNumberOfNoDictSortColumns() > 0) {
- timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparator(rowPage));
- } else {
- timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDIms(rowPage));
- }
- unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
- } else {
- rowPage.freeMemory();
- }
- startFileBasedMerge();
- }
-
- private void writeData(UnsafeCarbonRowPage rowPage, File file)
- throws CarbonSortKeyAndGroupByException {
- DataOutputStream stream = null;
- try {
- // open stream
- stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
- parameters.getFileWriteBufferSize()));
- int actualSize = rowPage.getBuffer().getActualSize();
- // write number of entries to the file
- stream.writeInt(actualSize);
- for (int i = 0; i < actualSize; i++) {
- rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
- stream);
- }
-
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- } finally {
- // close streams
- CarbonUtil.closeStreams(stream);
- }
- }
-
- /**
- * This method will be used to delete sort temp location is it is exites
- */
- public void deleteSortLocationIfExists() {
- CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
- }
-
- /**
- * Below method will be used to start file based merge
- *
- * @throws InterruptedException
- */
- private void startFileBasedMerge() throws InterruptedException {
- dataSorterAndWriterExecutorService.shutdown();
- dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
- }
-
- /**
- * Observer class for thread execution
- * In case of any failure we need stop all the running thread
- */
- private class ThreadStatusObserver {
- /**
- * Below method will be called if any thread fails during execution
- *
- * @param exception
- * @throws CarbonSortKeyAndGroupByException
- */
- public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
- dataSorterAndWriterExecutorService.shutdownNow();
- unsafeInMemoryIntermediateFileMerger.close();
- parameters.getObserver().setFailed(true);
- LOGGER.error(exception);
- throw new CarbonSortKeyAndGroupByException(exception);
- }
- }
-
- /**
- * This class is responsible for sorting and writing the object
- * array which holds the records equal to given array size
- */
- private class DataSorterAndWriter implements Runnable {
- private UnsafeCarbonRowPage page;
-
- public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
- this.page = rowPage;
- }
-
- @Override
- public void run() {
- try {
- long startTime = System.currentTimeMillis();
- TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
- new UnsafeIntSortDataFormat(page));
- // if sort_columns is not none, sort by sort_columns
- if (parameters.getNumberOfNoDictSortColumns() > 0) {
- timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
- new UnsafeRowComparator(page));
- } else {
- timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDIms(page));
- }
- if (page.isSaveToDisk()) {
- // create a new file every time
- // create a new file and pick a temp directory randomly every time
- String tmpDir = parameters.getTempFileLocation()[
- new Random().nextInt(parameters.getTempFileLocation().length)];
- File sortTempFile = new File(
- tmpDir + File.separator + parameters.getTableName()
- + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
- writeData(page, sortTempFile);
- LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
- + " and write is: " + (System.currentTimeMillis() - startTime));
- page.freeMemory();
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
- } else {
- // creating a new memory block as size is already allocated
- // so calling lazy memory allocator
- MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
- .allocateMemoryLazy(taskId, page.getDataBlock().size());
- // copying data from working memory manager to sortmemory manager
- CarbonUnsafe.getUnsafe()
- .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
- newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
- page.getDataBlock().size());
- // free unsafememory manager
- page.freeMemory();
- page.setNewDataBlock(newMemoryBlock);
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- page.getBuffer().loadToUnsafe();
- unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
- LOGGER.info(
- "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
- + (System.currentTimeMillis() - startTime));
- }
- } catch (Throwable e) {
- try {
- threadStatusObserver.notifyFailed(e);
- } catch (CarbonSortKeyAndGroupByException ex) {
- LOGGER.error(e);
- }
- } finally {
- semaphore.release();
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
deleted file mode 100644
index c54dcd6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
-
- /**
- * mapping of dictionary and no dictionary of sort_columns.
- */
- private boolean[] noDictionarySortColumnMaping;
-
- private Object baseObject;
-
- public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
- this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
- this.baseObject = rowPage.getDataBlock().getBaseObject();
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- int diff = 0;
- long rowA = rowL.address;
- long rowB = rowR.address;
- int sizeA = 0;
- int sizeB = 0;
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
- if (isNoDictionary) {
- short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
- byte[] byteArr1 = new byte[aShort1];
- sizeA += 2;
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
- sizeA += aShort1;
-
- short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
- byte[] byteArr2 = new byte[aShort2];
- sizeB += 2;
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
- sizeB += aShort2;
-
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
- }
- } else {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
- sizeA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
- sizeB += 4;
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- }
-
- return diff;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(UnsafeCarbonRow rowL, Object baseObjectL, UnsafeCarbonRow rowR,
- Object baseObjectR) {
- int diff = 0;
- long rowA = rowL.address;
- long rowB = rowR.address;
- int sizeA = 0;
- int sizeB = 0;
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
- if (isNoDictionary) {
- short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
- byte[] byteArr1 = new byte[aShort1];
- sizeA += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
- aShort1);
- sizeA += aShort1;
-
- short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
- byte[] byteArr2 = new byte[aShort2];
- sizeB += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
- aShort2);
- sizeB += aShort2;
-
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
- }
- } else {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
- sizeA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
- sizeB += 4;
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- }
-
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
deleted file mode 100644
index 53f976f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
-
- private Object baseObject;
-
- private int numberOfSortColumns;
-
- public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
- this.baseObject = rowPage.getDataBlock().getBaseObject();
- this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- int diff = 0;
- long rowA = rowL.address;
- long rowB = rowR.address;
- int sizeA = 0;
- int sizeB = 0;
- for (int i = 0; i < numberOfSortColumns; i++) {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
- sizeA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
- sizeB += 4;
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
-
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
deleted file mode 100644
index 9eab940..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * Interface for merging temporary sort files/ inmemory data
- */
-public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
-
- boolean hasNext();
-
- void readRow() throws CarbonSortKeyAndGroupByException;
-
- Object[] getRow();
-
- int numberOfRows();
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
deleted file mode 100644
index aff60f6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-public class UnsafeCarbonRow {
-
- public long address;
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
deleted file mode 100644
index 0ec4553..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-public class UnsafeCarbonRowForMerge extends UnsafeCarbonRow {
-
- public byte index;
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
deleted file mode 100644
index f00dd45..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-
-public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeFinalMergePageHolder.class.getName());
-
- private int counter;
-
- private int actualSize;
-
- private long[] mergedAddresses;
-
- private byte[] rowPageIndexes;
-
- private UnsafeCarbonRowPage[] rowPages;
-
- private NewRowComparator comparator;
-
- private Object[] currentRow;
-
- private int columnSize;
-
- public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
- boolean[] noDictSortColumnMapping, int columnSize) {
- this.actualSize = merger.getEntryCount();
- this.mergedAddresses = merger.getMergedAddresses();
- this.rowPageIndexes = merger.getRowPageIndexes();
- this.rowPages = merger.getUnsafeCarbonRowPages();
- LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new NewRowComparator(noDictSortColumnMapping);
- this.columnSize = columnSize;
- }
-
- public boolean hasNext() {
- if (counter < actualSize) {
- return true;
- }
- return false;
- }
-
- public void readRow() {
- currentRow = new Object[columnSize];
- rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
- counter++;
- }
-
- public Object[] getRow() {
- return currentRow;
- }
-
- @Override public int compareTo(SortTempChunkHolder o) {
- return comparator.compare(currentRow, o.getRow());
- }
-
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof UnsafeFinalMergePageHolder)) {
- return false;
- }
-
- UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj;
- return this == o;
- }
-
- @Override public int hashCode() {
- return super.hashCode();
- }
-
- public int numberOfRows() {
- return actualSize;
- }
-
- public void close() {
- for (int i = 0; i < rowPages.length; i++) {
- rowPages[i].freeMemory();
- }
- }
-}