You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:22:00 UTC
[23/23] incubator-kylin git commit: KYLIN-875 half way
KYLIN-875 half way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7663fff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7663fff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7663fff4
Branch: refs/heads/KYLIN-875
Commit: 7663fff49aa9cefbb4f22e7193b7e2392b2dbfdd
Parents: ccdc415
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Jul 23 19:21:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu Jul 23 19:21:10 2015 +0800
----------------------------------------------------------------------
core-common/pom.xml | 4 +
core-cube/pom.xml | 5 +
.../kylin/cube/gridtable/CubeCodeSystem.java | 287 ++++++++
.../kylin/cube/gridtable/CubeGridTable.java | 70 ++
.../gridtable/CuboidToGridTableMapping.java | 172 +++++
.../inmemcubing/AbstractInMemCubeBuilder.java | 92 +++
.../cube/inmemcubing/ConcurrentDiskStore.java | 342 ++++++++++
.../cube/inmemcubing/DoggedCubeBuilder.java | 434 ++++++++++++
.../kylin/cube/inmemcubing/ICuboidWriter.java | 28 +
.../cube/inmemcubing/InMemCubeBuilder.java | 661 ++++++++++++++++++
.../kylin/cube/inmemcubing/MemDiskStore.java | 679 +++++++++++++++++++
.../kylin/gridtable/GTAggregateScanner.java | 268 ++++++++
.../org/apache/kylin/gridtable/GTBuilder.java | 74 ++
.../apache/kylin/gridtable/GTFilterScanner.java | 100 +++
.../java/org/apache/kylin/gridtable/GTInfo.java | 246 +++++++
.../apache/kylin/gridtable/GTInvertedIndex.java | 205 ++++++
.../gridtable/GTInvertedIndexOfColumn.java | 115 ++++
.../apache/kylin/gridtable/GTRawScanner.java | 111 +++
.../org/apache/kylin/gridtable/GTRecord.java | 285 ++++++++
.../org/apache/kylin/gridtable/GTRowBlock.java | 259 +++++++
.../kylin/gridtable/GTSampleCodeSystem.java | 101 +++
.../org/apache/kylin/gridtable/GTScanRange.java | 83 +++
.../kylin/gridtable/GTScanRangePlanner.java | 486 +++++++++++++
.../apache/kylin/gridtable/GTScanRequest.java | 155 +++++
.../java/org/apache/kylin/gridtable/GTUtil.java | 221 ++++++
.../org/apache/kylin/gridtable/GridTable.java | 61 ++
.../apache/kylin/gridtable/IGTCodeSystem.java | 43 ++
.../apache/kylin/gridtable/IGTComparator.java | 15 +
.../org/apache/kylin/gridtable/IGTScanner.java | 13 +
.../org/apache/kylin/gridtable/IGTStore.java | 26 +
.../apache/kylin/gridtable/UnitTestSupport.java | 101 +++
.../gridtable/memstore/GTSimpleMemStore.java | 112 +++
.../gridtable/AggregationCacheMemSizeTest.java | 214 ++++++
.../kylin/gridtable/DictGridTableTest.java | 381 +++++++++++
.../kylin/gridtable/SimpleGridTableTest.java | 195 ++++++
.../gridtable/SimpleInvertedIndexTest.java | 188 +++++
core-job/pom.xml | 5 +
.../apache/kylin/engine/BuildEngineFactory.java | 52 ++
.../apache/kylin/engine/IBatchCubingEngine.java | 35 +
.../kylin/engine/IStreamingCubingEngine.java | 8 +
.../java/org/apache/kylin/job/JobInstance.java | 504 ++++++++++++++
.../org/apache/kylin/job/JoinedFlatTable.java | 243 +++++++
.../java/org/apache/kylin/job/Scheduler.java | 36 +
.../apache/kylin/job/cmd/BaseCommandOutput.java | 29 +
.../apache/kylin/job/cmd/ICommandOutput.java | 44 ++
.../org/apache/kylin/job/cmd/IJobCommand.java | 32 +
.../java/org/apache/kylin/job/cmd/ShellCmd.java | 104 +++
.../apache/kylin/job/cmd/ShellCmdOutput.java | 84 +++
.../apache/kylin/job/common/OptionsHelper.java | 79 +++
.../kylin/job/common/ShellExecutable.java | 143 ++++
.../kylin/job/constant/ExecutableConstants.java | 80 +++
.../kylin/job/constant/JobStatusEnum.java | 49 ++
.../kylin/job/constant/JobStepCmdTypeEnum.java | 27 +
.../kylin/job/constant/JobStepStatusEnum.java | 51 ++
.../org/apache/kylin/job/dao/ExecutableDao.java | 221 ++++++
.../kylin/job/dao/ExecutableOutputPO.java | 65 ++
.../org/apache/kylin/job/dao/ExecutablePO.java | 78 +++
.../kylin/job/engine/JobEngineConfig.java | 185 +++++
.../kylin/job/exception/ExecuteException.java | 41 ++
.../exception/IllegalStateTranferException.java | 45 ++
.../kylin/job/exception/JobException.java | 58 ++
.../kylin/job/exception/LockException.java | 44 ++
.../job/exception/PersistentException.java | 44 ++
.../kylin/job/exception/SchedulerException.java | 44 ++
.../kylin/job/execution/AbstractExecutable.java | 302 +++++++++
.../kylin/job/execution/ChainedExecutable.java | 31 +
.../job/execution/DefaultChainedExecutable.java | 122 ++++
.../kylin/job/execution/DefaultOutput.java | 97 +++
.../apache/kylin/job/execution/Executable.java | 42 ++
.../kylin/job/execution/ExecutableContext.java | 30 +
.../kylin/job/execution/ExecutableState.java | 82 +++
.../kylin/job/execution/ExecuteResult.java | 54 ++
.../apache/kylin/job/execution/Idempotent.java | 28 +
.../org/apache/kylin/job/execution/Output.java | 34 +
.../job/impl/threadpool/DefaultContext.java | 61 ++
.../job/impl/threadpool/DefaultScheduler.java | 209 ++++++
.../kylin/job/manager/ExecutableManager.java | 318 +++++++++
core-storage/pom.xml | 6 +
.../kylin/storage/ICachableStorageQuery.java | 33 +
.../java/org/apache/kylin/storage/IStorage.java | 28 +
.../org/apache/kylin/storage/IStorageQuery.java | 36 +
.../apache/kylin/storage/StorageContext.java | 170 +++++
.../apache/kylin/storage/StorageFactory2.java | 34 +
.../org/apache/kylin/storage/tuple/Tuple.java | 198 ++++++
.../apache/kylin/storage/tuple/TupleInfo.java | 114 ++++
engine-mr/pom.xml | 9 +
.../kylin/engine/mr/BatchCubingJobBuilder.java | 128 ++++
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 94 +++
.../kylin/engine/mr/BatchMergeJobBuilder.java | 88 +++
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 99 +++
.../kylin/engine/mr/ByteArrayWritable.java | 166 +++++
.../org/apache/kylin/engine/mr/CubingJob.java | 188 +++++
.../org/apache/kylin/engine/mr/IMRInput.java | 69 ++
.../org/apache/kylin/engine/mr/IMROutput.java | 78 +++
.../org/apache/kylin/engine/mr/IMROutput2.java | 88 +++
.../kylin/engine/mr/JobBuilderSupport.java | 168 +++++
.../kylin/engine/mr/MRBatchCubingEngine.java | 47 ++
.../kylin/engine/mr/MRBatchCubingEngine2.java | 47 ++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 55 ++
.../engine/mr/common/AbstractHadoopJob.java | 379 +++++++++++
.../kylin/engine/mr/common/BatchConstants.java | 58 ++
.../common/DefaultSslProtocolSocketFactory.java | 150 ++++
.../mr/common/DefaultX509TrustManager.java | 114 ++++
.../kylin/engine/mr/common/HadoopCmdOutput.java | 105 +++
.../engine/mr/common/HadoopShellExecutable.java | 95 +++
.../engine/mr/common/HadoopStatusChecker.java | 99 +++
.../engine/mr/common/HadoopStatusGetter.java | 114 ++++
.../engine/mr/common/MapReduceExecutable.java | 246 +++++++
.../kylin/engine/mr/steps/BaseCuboidJob.java | 39 ++
.../engine/mr/steps/BaseCuboidMapperBase.java | 205 ++++++
.../engine/mr/steps/CreateDictionaryJob.java | 75 ++
.../apache/kylin/engine/mr/steps/CuboidJob.java | 200 ++++++
.../kylin/engine/mr/steps/CuboidReducer.java | 101 +++
.../mr/steps/FactDistinctColumnsCombiner.java | 63 ++
.../engine/mr/steps/FactDistinctColumnsJob.java | 132 ++++
.../mr/steps/FactDistinctColumnsMapperBase.java | 89 +++
.../mr/steps/FactDistinctColumnsReducer.java | 224 ++++++
.../mr/steps/FactDistinctHiveColumnsMapper.java | 176 +++++
.../engine/mr/steps/HiveToBaseCuboidMapper.java | 69 ++
.../kylin/engine/mr/steps/InMemCuboidJob.java | 103 +++
.../engine/mr/steps/InMemCuboidMapper.java | 122 ++++
.../engine/mr/steps/InMemCuboidReducer.java | 95 +++
.../engine/mr/steps/KeyDistributionJob.java | 157 +++++
.../engine/mr/steps/KeyDistributionMapper.java | 124 ++++
.../engine/mr/steps/KeyDistributionReducer.java | 113 +++
.../mr/steps/MapContextGTRecordWriter.java | 96 +++
.../mr/steps/MergeCuboidFromStorageJob.java | 91 +++
.../mr/steps/MergeCuboidFromStorageMapper.java | 197 ++++++
.../kylin/engine/mr/steps/MergeCuboidJob.java | 100 +++
.../engine/mr/steps/MergeCuboidMapper.java | 192 ++++++
.../engine/mr/steps/MergeDictionaryStep.java | 197 ++++++
.../engine/mr/steps/MergeStatisticsStep.java | 188 +++++
.../engine/mr/steps/MetadataCleanupJob.java | 161 +++++
.../kylin/engine/mr/steps/NDCuboidJob.java | 39 ++
.../kylin/engine/mr/steps/NDCuboidMapper.java | 142 ++++
.../mr/steps/RangeKeyDistributionJob.java | 109 +++
.../mr/steps/RangeKeyDistributionMapper.java | 71 ++
.../mr/steps/RangeKeyDistributionReducer.java | 106 +++
.../mr/steps/RowKeyDistributionCheckerJob.java | 97 +++
.../steps/RowKeyDistributionCheckerMapper.java | 107 +++
.../steps/RowKeyDistributionCheckerReducer.java | 51 ++
.../engine/mr/steps/SaveStatisticsStep.java | 110 +++
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 101 +++
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 136 ++++
.../apache/kylin/engine/BuildEngineFactory.java | 53 --
.../apache/kylin/engine/IBatchCubingEngine.java | 35 -
.../kylin/engine/IStreamingCubingEngine.java | 8 -
.../kylin/engine/mr/BatchCubingJobBuilder.java | 128 ----
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 94 ---
.../kylin/engine/mr/BatchMergeJobBuilder.java | 88 ---
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 98 ---
.../kylin/engine/mr/ByteArrayWritable.java | 166 -----
.../org/apache/kylin/engine/mr/CubingJob.java | 188 -----
.../org/apache/kylin/engine/mr/IMRInput.java | 69 --
.../org/apache/kylin/engine/mr/IMROutput.java | 78 ---
.../org/apache/kylin/engine/mr/IMROutput2.java | 88 ---
.../kylin/engine/mr/JobBuilderSupport.java | 168 -----
.../kylin/engine/mr/MRBatchCubingEngine.java | 47 --
.../kylin/engine/mr/MRBatchCubingEngine2.java | 47 --
.../java/org/apache/kylin/engine/mr/MRUtil.java | 55 --
.../kylin/engine/mr/steps/InMemCuboidJob.java | 103 ---
.../engine/mr/steps/InMemCuboidMapper.java | 122 ----
.../engine/mr/steps/InMemCuboidReducer.java | 95 ---
.../mr/steps/MapContextGTRecordWriter.java | 96 ---
.../mr/steps/MergeCuboidFromStorageJob.java | 95 ---
.../mr/steps/MergeCuboidFromStorageMapper.java | 197 ------
.../engine/mr/steps/MergeDictionaryStep.java | 197 ------
.../engine/mr/steps/MergeStatisticsStep.java | 189 ------
.../engine/mr/steps/SaveStatisticsStep.java | 110 ---
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 101 ---
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 136 ----
.../apache/kylin/job/CubeMetadataUpgrade.java | 22 +-
.../java/org/apache/kylin/job/JobInstance.java | 504 --------------
.../org/apache/kylin/job/JoinedFlatTable.java | 236 -------
.../java/org/apache/kylin/job/Scheduler.java | 36 -
.../apache/kylin/job/cmd/BaseCommandOutput.java | 29 -
.../apache/kylin/job/cmd/ICommandOutput.java | 44 --
.../org/apache/kylin/job/cmd/IJobCommand.java | 32 -
.../java/org/apache/kylin/job/cmd/ShellCmd.java | 104 ---
.../apache/kylin/job/cmd/ShellCmdOutput.java | 84 ---
.../kylin/job/common/HadoopCmdOutput.java | 105 ---
.../kylin/job/common/HadoopShellExecutable.java | 97 ---
.../apache/kylin/job/common/HqlExecutable.java | 106 ---
.../kylin/job/common/MapReduceExecutable.java | 248 -------
.../kylin/job/common/ShellExecutable.java | 143 ----
.../kylin/job/constant/BatchConstants.java | 58 --
.../kylin/job/constant/ExecutableConstants.java | 80 ---
.../kylin/job/constant/JobStatusEnum.java | 49 --
.../kylin/job/constant/JobStepCmdTypeEnum.java | 27 -
.../kylin/job/constant/JobStepStatusEnum.java | 51 --
.../org/apache/kylin/job/dao/ExecutableDao.java | 221 ------
.../kylin/job/dao/ExecutableOutputPO.java | 65 --
.../org/apache/kylin/job/dao/ExecutablePO.java | 78 ---
.../job/deployment/HbaseConfigPrinterCLI.java | 3 +-
.../kylin/job/engine/JobEngineConfig.java | 184 -----
.../kylin/job/exception/ExecuteException.java | 41 --
.../exception/IllegalStateTranferException.java | 45 --
.../kylin/job/exception/JobException.java | 58 --
.../kylin/job/exception/LockException.java | 44 --
.../job/exception/PersistentException.java | 44 --
.../kylin/job/exception/SchedulerException.java | 44 --
.../kylin/job/execution/AbstractExecutable.java | 302 ---------
.../kylin/job/execution/ChainedExecutable.java | 31 -
.../job/execution/DefaultChainedExecutable.java | 122 ----
.../kylin/job/execution/DefaultOutput.java | 97 ---
.../apache/kylin/job/execution/Executable.java | 42 --
.../kylin/job/execution/ExecutableContext.java | 30 -
.../kylin/job/execution/ExecutableState.java | 82 ---
.../kylin/job/execution/ExecuteResult.java | 54 --
.../apache/kylin/job/execution/Idempotent.java | 28 -
.../org/apache/kylin/job/execution/Output.java | 34 -
.../kylin/job/hadoop/AbstractHadoopJob.java | 401 -----------
.../cardinality/ColumnCardinalityMapper.java | 4 +-
.../cardinality/HiveColumnCardinalityJob.java | 4 +-
.../HiveColumnCardinalityUpdateJob.java | 3 +-
.../kylin/job/hadoop/cube/BaseCuboidJob.java | 39 --
.../job/hadoop/cube/BaseCuboidMapperBase.java | 205 ------
.../kylin/job/hadoop/cube/CubeHFileJob.java | 106 ---
.../kylin/job/hadoop/cube/CubeHFileMapper.java | 99 ---
.../apache/kylin/job/hadoop/cube/CuboidJob.java | 200 ------
.../kylin/job/hadoop/cube/CuboidReducer.java | 101 ---
.../cube/FactDistinctColumnsCombiner.java | 63 --
.../job/hadoop/cube/FactDistinctColumnsJob.java | 132 ----
.../cube/FactDistinctColumnsMapperBase.java | 89 ---
.../hadoop/cube/FactDistinctColumnsReducer.java | 224 ------
.../cube/FactDistinctHiveColumnsMapper.java | 176 -----
.../job/hadoop/cube/HiveToBaseCuboidMapper.java | 69 --
.../job/hadoop/cube/KeyDistributionJob.java | 157 -----
.../job/hadoop/cube/KeyDistributionMapper.java | 124 ----
.../job/hadoop/cube/KeyDistributionReducer.java | 113 ---
.../kylin/job/hadoop/cube/KeyValueCreator.java | 104 ---
.../kylin/job/hadoop/cube/MergeCuboidJob.java | 101 ---
.../job/hadoop/cube/MergeCuboidMapper.java | 192 ------
.../job/hadoop/cube/MetadataCleanupJob.java | 160 -----
.../kylin/job/hadoop/cube/NDCuboidJob.java | 39 --
.../kylin/job/hadoop/cube/NDCuboidMapper.java | 142 ----
.../job/hadoop/cube/NewBaseCuboidMapper.java | 4 +-
.../job/hadoop/cube/OrphanHBaseCleanJob.java | 2 +-
.../hadoop/cube/RangeKeyDistributionJob.java | 110 ---
.../hadoop/cube/RangeKeyDistributionMapper.java | 71 --
.../cube/RangeKeyDistributionReducer.java | 106 ---
.../cube/RowKeyDistributionCheckerJob.java | 98 ---
.../cube/RowKeyDistributionCheckerMapper.java | 107 ---
.../cube/RowKeyDistributionCheckerReducer.java | 51 --
.../job/hadoop/cube/StorageCleanupJob.java | 2 +-
.../job/hadoop/dict/CreateDictionaryJob.java | 75 --
.../dict/CreateInvertedIndexDictionaryJob.java | 2 +-
.../kylin/job/hadoop/hbase/BulkLoadJob.java | 101 ---
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 314 ---------
.../kylin/job/hadoop/hbase/CubeHTableUtil.java | 84 ---
.../job/hadoop/hive/SqlHiveDataTypeMapping.java | 33 -
.../job/hadoop/invertedindex/IIBulkLoadJob.java | 2 +-
.../hadoop/invertedindex/IICreateHFileJob.java | 2 +-
.../hadoop/invertedindex/IICreateHTableJob.java | 6 +-
.../invertedindex/IIDistinctColumnsJob.java | 4 +-
.../invertedindex/IIDistinctColumnsReducer.java | 2 +-
.../hadoop/invertedindex/InvertedIndexJob.java | 27 +-
.../invertedindex/InvertedIndexMapper.java | 4 +-
.../invertedindex/InvertedIndexPartitioner.java | 4 +-
.../invertedindex/InvertedIndexReducer.java | 4 +-
.../invertedindex/RandomKeyDistributionJob.java | 5 +-
.../RandomKeyDistributionMapper.java | 2 +-
.../RandomKeyDistributionReducer.java | 2 +-
.../job/impl/threadpool/DefaultContext.java | 61 --
.../job/impl/threadpool/DefaultScheduler.java | 209 ------
.../inmemcubing/AbstractInMemCubeBuilder.java | 92 ---
.../job/inmemcubing/ConcurrentDiskStore.java | 342 ----------
.../job/inmemcubing/DoggedCubeBuilder.java | 427 ------------
.../kylin/job/inmemcubing/ICuboidWriter.java | 28 -
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 661 ------------------
.../kylin/job/inmemcubing/MemDiskStore.java | 679 -------------------
.../kylin/job/invertedindex/IIJobBuilder.java | 4 +-
.../kylin/job/manager/ExecutableManager.java | 318 ---------
.../kylin/job/streaming/CubeStreamConsumer.java | 16 +-
.../apache/kylin/job/tools/CleanHtableCLI.java | 3 +-
.../tools/DefaultSslProtocolSocketFactory.java | 150 ----
.../job/tools/DefaultX509TrustManager.java | 114 ----
.../kylin/job/tools/DeployCoprocessorCLI.java | 317 ---------
.../kylin/job/tools/HadoopStatusChecker.java | 99 ---
.../kylin/job/tools/HadoopStatusGetter.java | 116 ----
.../kylin/job/tools/HtableAlterMetadataCLI.java | 2 +-
.../kylin/job/tools/LZOSupportnessChecker.java | 46 --
.../apache/kylin/job/tools/OptionsHelper.java | 81 ---
.../apache/kylin/source/hive/HiveMRInput.java | 174 -----
.../source/hive/HiveSourceTableLoader.java | 155 -----
.../org/apache/kylin/source/hive/HiveTable.java | 99 ---
.../kylin/source/hive/HiveTableReader.java | 176 -----
.../kylin/source/hive/HiveTableSource.java | 43 --
.../apache/kylin/storage/StorageFactory2.java | 34 -
.../kylin/storage/hbase/HBaseMROutput.java | 60 --
.../kylin/storage/hbase/HBaseMROutput2.java | 298 --------
.../kylin/storage/hbase/HBaseMRSteps.java | 138 ----
.../kylin/storage/hbase/HBaseStorage.java | 46 --
.../storage/hbase/InMemKeyValueCreator.java | 73 --
.../apache/kylin/storage/hbase/MergeGCStep.java | 121 ----
.../kylin/job/BuildCubeWithEngineTest.java | 2 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 2 +-
.../java/org/apache/kylin/job/DeployUtil.java | 22 +-
.../job/hadoop/cube/CubeHFileMapper2Test.java | 2 +-
.../job/hadoop/cube/CubeHFileMapperTest.java | 4 +-
.../kylin/job/hadoop/cube/CubeReducerTest.java | 3 +-
.../HiveToBaseCuboidMapperPerformanceTest.java | 1 +
.../job/hadoop/cube/MergeCuboidJobTest.java | 2 +-
.../job/hadoop/cube/MergeCuboidMapperTest.java | 1 +
.../kylin/job/hadoop/cube/MockupMapContext.java | 3 +-
.../kylin/job/hadoop/cube/NDCuboidJobTest.java | 2 +-
.../job/hadoop/cube/NDCuboidMapperTest.java | 6 +-
.../cube/RandomKeyDistributionMapperTest.java | 3 +-
.../cube/RandomKeyDistributionReducerTest.java | 2 +-
.../cube/RangeKeyDistributionJobTest.java | 2 +-
.../cube/RangeKeyDistributionMapperTest.java | 1 +
.../cube/RangeKeyDistributionReducerTest.java | 1 +
.../cubev2/FactDistinctColumnsReducerTest.java | 2 +-
.../job/hadoop/hbase/CreateHTableTest.java | 2 +-
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../inmemcubing/ConcurrentDiskStoreTest.java | 15 +-
.../DoggedCubeBuilderStressTest.java | 4 +-
.../job/inmemcubing/DoggedCubeBuilderTest.java | 5 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 4 +-
.../kylin/job/inmemcubing/MemDiskStoreTest.java | 15 +-
.../apache/kylin/rest/service/CubeService.java | 4 +-
.../apache/kylin/rest/service/JobService.java | 4 +-
source-hive/pom.xml | 5 +
.../apache/kylin/source/hive/HiveMRInput.java | 174 +++++
.../source/hive/HiveSourceTableLoader.java | 155 +++++
.../org/apache/kylin/source/hive/HiveTable.java | 99 +++
.../kylin/source/hive/HiveTableReader.java | 176 +++++
.../kylin/source/hive/HiveTableSource.java | 43 ++
.../apache/kylin/source/hive/HqlExecutable.java | 105 +++
.../job/spark/AbstractSparkApplication.java | 2 +-
.../apache/kylin/job/spark/SparkCountDemo.java | 3 +-
.../apache/kylin/job/spark/SparkHelloWorld.java | 2 +-
.../apache/kylin/job/spark/SparkHiveDemo.java | 2 +-
storage-hbase/pom.xml | 5 +
.../kylin/storage/hbase/HBaseMROutput.java | 61 ++
.../kylin/storage/hbase/HBaseMROutput2.java | 297 ++++++++
.../hbase/HBaseRegionSizeCalculator.java | 128 ----
.../kylin/storage/hbase/HBaseStorage.java | 46 ++
.../kylin/storage/hbase/RowValueDecoder.java | 131 ----
.../kylin/storage/hbase/ZookeeperJobLock.java | 83 ---
.../kylin/storage/hbase/steps/BulkLoadJob.java | 99 +++
.../storage/hbase/steps/CreateHTableJob.java | 314 +++++++++
.../kylin/storage/hbase/steps/CubeHFileJob.java | 106 +++
.../storage/hbase/steps/CubeHFileMapper.java | 99 +++
.../storage/hbase/steps/CubeHTableUtil.java | 85 +++
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 135 ++++
.../hbase/steps/InMemKeyValueCreator.java | 73 ++
.../storage/hbase/steps/KeyValueCreator.java | 104 +++
.../kylin/storage/hbase/steps/MergeGCStep.java | 121 ++++
.../storage/hbase/steps/RowValueDecoder.java | 131 ++++
.../hbase/util/DeployCoprocessorCLI.java | 317 +++++++++
.../hbase/util/HBaseRegionSizeCalculator.java | 128 ++++
.../hbase/util/LZOSupportnessChecker.java | 46 ++
.../storage/hbase/util/ZookeeperJobLock.java | 84 +++
.../storage/hbase/RowValueDecoderTest.java | 1 +
storage/pom.xml | 5 -
.../kylin/storage/ICachableStorageQuery.java | 33 -
.../java/org/apache/kylin/storage/IStorage.java | 28 -
.../org/apache/kylin/storage/IStorageQuery.java | 36 -
.../apache/kylin/storage/StorageContext.java | 170 -----
.../kylin/storage/cube/CubeCodeSystem.java | 287 --------
.../kylin/storage/cube/CubeGridTable.java | 70 --
.../storage/cube/CubeHBaseReadonlyStore.java | 12 +-
.../apache/kylin/storage/cube/CubeScanner.java | 18 +-
.../kylin/storage/cube/CubeTupleConverter.java | 2 +-
.../storage/cube/CuboidToGridTableMapping.java | 172 -----
.../cube/SequentialCubeTupleIterator.java | 2 +-
.../storage/gridtable/GTAggregateScanner.java | 268 --------
.../kylin/storage/gridtable/GTBuilder.java | 74 --
.../kylin/storage/gridtable/GTComboStore.java | 115 ----
.../storage/gridtable/GTFilterScanner.java | 100 ---
.../apache/kylin/storage/gridtable/GTInfo.java | 246 -------
.../storage/gridtable/GTInvertedIndex.java | 205 ------
.../gridtable/GTInvertedIndexOfColumn.java | 115 ----
.../kylin/storage/gridtable/GTRawScanner.java | 111 ---
.../kylin/storage/gridtable/GTRecord.java | 285 --------
.../kylin/storage/gridtable/GTRowBlock.java | 259 -------
.../storage/gridtable/GTSampleCodeSystem.java | 101 ---
.../kylin/storage/gridtable/GTScanRange.java | 83 ---
.../storage/gridtable/GTScanRangePlanner.java | 486 -------------
.../kylin/storage/gridtable/GTScanRequest.java | 155 -----
.../apache/kylin/storage/gridtable/GTUtil.java | 221 ------
.../kylin/storage/gridtable/GridTable.java | 61 --
.../kylin/storage/gridtable/IGTCodeSystem.java | 43 --
.../kylin/storage/gridtable/IGTComparator.java | 15 -
.../kylin/storage/gridtable/IGTScanner.java | 13 -
.../kylin/storage/gridtable/IGTStore.java | 26 -
.../storage/gridtable/UnitTestSupport.java | 101 ---
.../storage/gridtable/diskstore/FileSystem.java | 23 -
.../gridtable/diskstore/GTDiskStore.java | 179 -----
.../gridtable/diskstore/HadoopFileSystem.java | 97 ---
.../gridtable/diskstore/LocalFileSystem.java | 67 --
.../gridtable/memstore/GTSimpleMemStore.java | 112 ---
.../storage/hbase/CubeSegmentTupleIterator.java | 1 +
.../kylin/storage/hbase/CubeStorageQuery.java | 1 +
.../kylin/storage/hbase/CubeTupleConverter.java | 1 +
.../hbase/SerializedHBaseTupleIterator.java | 1 +
.../observer/ObserverAggregators.java | 2 +-
.../coprocessor/observer/ObserverEnabler.java | 2 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 198 ------
.../apache/kylin/storage/tuple/TupleInfo.java | 114 ----
.../gridtable/AggregationCacheMemSizeTest.java | 213 ------
.../storage/gridtable/DictGridTableTest.java | 381 -----------
.../storage/gridtable/SimpleGridTableTest.java | 188 -----
.../gridtable/SimpleInvertedIndexTest.java | 183 -----
405 files changed, 21539 insertions(+), 21979 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index adb189e..06db315 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -56,6 +56,10 @@
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/pom.xml
----------------------------------------------------------------------
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 2b7177a..b74b69b 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -44,6 +44,11 @@
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.n3twork.druid</groupId>
+ <artifactId>extendedset</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
new file mode 100644
index 0000000..7a8c364
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -0,0 +1,287 @@
+package org.apache.kylin.cube.gridtable;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.IGTCodeSystem;
+import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by shaoshi on 3/23/15.
+ * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
+ * its data type to serialize/deserialize it;
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class CubeCodeSystem implements IGTCodeSystem {
+ private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class);
+
+ // ============================================================================
+
+ private GTInfo info;
+ private Map<Integer, Dictionary> dictionaryMap; // column index ==> dictionary of column
+ private Map<Integer, Integer> fixLenMap; // column index ==> fixed length of column
+ private Map<Integer, Integer> dependentMetricsMap;
+ private IGTComparator comparator;
+ private DataTypeSerializer[] serializers;
+
+ public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap) {
+ this(dictionaryMap, Collections.<Integer, Integer>emptyMap(), Collections.<Integer, Integer>emptyMap());
+ }
+
+ public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap, Map<Integer, Integer> fixLenMap, Map<Integer, Integer> dependentMetricsMap) {
+ this.dictionaryMap = dictionaryMap;
+ this.fixLenMap = fixLenMap;
+ this.dependentMetricsMap = dependentMetricsMap;
+ }
+
+ @Override
+ public void init(GTInfo info) {
+ this.info = info;
+
+ serializers = new DataTypeSerializer[info.getColumnCount()];
+ for (int i = 0; i < info.getColumnCount(); i++) {
+ // dimension with dictionary
+ if (dictionaryMap.get(i) != null) {
+ serializers[i] = new DictionarySerializer(dictionaryMap.get(i));
+ }
+ // dimension of fixed length
+ else if (fixLenMap.get(i) != null) {
+ serializers[i] = new FixLenSerializer(fixLenMap.get(i));
+ }
+ // metrics
+ else {
+ serializers[i] = DataTypeSerializer.create(info.getColumnType(i));
+ }
+ }
+
+ this.comparator = new IGTComparator() {
+ @Override
+ public boolean isNull(ByteArray code) {
+ // all 0xff is null
+ byte[] array = code.array();
+ for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
+ if (array[j] != Dictionary.NULL)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compare(ByteArray code1, ByteArray code2) {
+ return code1.compareTo(code2);
+ }
+ };
+ }
+
+ @Override
+ public IGTComparator getComparator() {
+ return comparator;
+ }
+
+ @Override
+ public int codeLength(int col, ByteBuffer buf) {
+ return serializers[col].peekLength(buf);
+ }
+
+ @Override
+ public int maxCodeLength(int col) {
+ return serializers[col].maxLength();
+ }
+
+ @Override
+ public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
+ encodeColumnValue(col, value, 0, buf);
+ }
+
+ @Override
+ public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
+ // this is a bit too complicated, but encoding only happens once at build time, so it is OK
+ DataTypeSerializer serializer = serializers[col];
+ try {
+ if (serializer instanceof DictionarySerializer) {
+ ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
+ } else {
+ serializer.serialize(value, buf);
+ }
+ } catch (ClassCastException ex) {
+ // try convert string into a correct object type
+ try {
+ if (value instanceof String) {
+ Object converted = serializer.valueOf((String) value);
+ if ((converted instanceof String) == false) {
+ encodeColumnValue(col, converted, roundingFlag, buf);
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ logger.error("Fail to encode value '" + value + "'", e);
+ }
+ throw ex;
+ }
+ }
+
+ @Override
+ public Object decodeColumnValue(int col, ByteBuffer buf) {
+ return serializers[col].deserialize(buf);
+ }
+
+ @Override
+ public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
+ assert columns.trueBitCount() == aggrFunctions.length;
+
+ MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
+ for (int i = 0; i < result.length; i++) {
+ int col = columns.trueBitAt(i);
+ result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
+ }
+
+ // deal with holistic distinct count
+ if (dependentMetricsMap != null) {
+ for (Integer child : dependentMetricsMap.keySet()) {
+ if (columns.get(child)) {
+ Integer parent = dependentMetricsMap.get(child);
+ if (columns.get(parent) == false)
+ throw new IllegalStateException();
+
+ int childIdx = columns.trueBitIndexOf(child);
+ int parentIdx = columns.trueBitIndexOf(parent);
+ result[childIdx].setDependentAggregator(result[parentIdx]);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ static class DictionarySerializer extends DataTypeSerializer {
+ private Dictionary dictionary;
+
+ DictionarySerializer(Dictionary dictionary) {
+ this.dictionary = dictionary;
+ }
+
+ public void serializeWithRounding(Object value, int roundingFlag, ByteBuffer buf) {
+ int id = dictionary.getIdFromValue(value, roundingFlag);
+ BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
+ }
+
+ @Override
+ public void serialize(Object value, ByteBuffer buf) {
+ int id = dictionary.getIdFromValue(value);
+ BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
+ }
+
+ @Override
+ public Object deserialize(ByteBuffer in) {
+ int id = BytesUtil.readUnsigned(in, dictionary.getSizeOfId());
+ return dictionary.getValueFromId(id);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return dictionary.getSizeOfId();
+ }
+
+ @Override
+ public int maxLength() {
+ return dictionary.getSizeOfId();
+ }
+
+ @Override
+ public Object valueOf(byte[] value) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static class FixLenSerializer extends DataTypeSerializer {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
+
+ private int fixLen;
+
+ FixLenSerializer(int fixLen) {
+ this.fixLen = fixLen;
+ }
+
+ private byte[] currentBuf() {
+ byte[] buf = current.get();
+ if (buf == null) {
+ buf = new byte[fixLen];
+ current.set(buf);
+ }
+ return buf;
+ }
+
+ @Override
+ public void serialize(Object value, ByteBuffer out) {
+ byte[] buf = currentBuf();
+ if (value == null) {
+ Arrays.fill(buf, Dictionary.NULL);
+ out.put(buf);
+ } else {
+ byte[] bytes = Bytes.toBytes(value.toString());
+ if (bytes.length > fixLen) {
+ throw new IllegalStateException("Expect at most " + fixLen + " bytes, but got " + bytes.length + ", value string: " + value.toString());
+ }
+ out.put(bytes);
+ for (int i = bytes.length; i < fixLen; i++) {
+ out.put(RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
+ }
+ }
+ }
+
+ @Override
+ public Object deserialize(ByteBuffer in) {
+ byte[] buf = currentBuf();
+ in.get(buf);
+
+ int tail = fixLen;
+ while (tail > 0 && (buf[tail - 1] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE || buf[tail - 1] == Dictionary.NULL)) {
+ tail--;
+ }
+
+ if (tail == 0) {
+ return buf[0] == Dictionary.NULL ? null : "";
+ }
+
+ return Bytes.toString(buf, 0, tail);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return fixLen;
+ }
+
+ @Override
+ public int maxLength() {
+ return fixLen;
+ }
+
+ @Override
+ public Object valueOf(byte[] value) {
+ try {
+ return new String(value, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // does not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
new file mode 100644
index 0000000..66466a3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.cube.gridtable;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("rawtypes")
+public class CubeGridTable {
+
+ public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+
+ // build a dictionary map
+ Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns();
+ for (TblColRef col : dimCols) {
+ Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col);
+ if (dictionary != null) {
+ dictionaryMap.put(col, dictionary);
+ }
+ }
+ return dictionaryMap;
+ }
+
+ public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
+ Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
+ return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
+ }
+
+ public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
+
+ Map<Integer, Dictionary> dictionaryByColIdx = Maps.newHashMap();
+ Map<Integer, Integer> fixLenByColIdx = Maps.newHashMap();
+
+ for (TblColRef dim : cuboid.getColumns()) {
+ int colIndex = mapping.getIndexOf(dim);
+ if (cubeDesc.getRowkey().isUseDictionary(dim)) {
+ Dictionary dict = dictionaryMap.get(dim);
+ dictionaryByColIdx.put(colIndex, dict);
+ } else {
+ int len = cubeDesc.getRowkey().getColumnLength(dim);
+ if (len == 0)
+ throw new IllegalStateException();
+
+ fixLenByColIdx.put(colIndex, len);
+ }
+ }
+
+ GTInfo.Builder builder = GTInfo.builder();
+ builder.setTableName("Cuboid " + cuboidId);
+ builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx, mapping.getDependentMetricsMap()));
+ builder.setColumns(mapping.getDataTypes());
+ builder.setPrimaryKey(mapping.getPrimaryKey());
+ builder.enableColumnBlock(mapping.getColumnBlocks());
+ return builder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
new file mode 100644
index 0000000..cf3a119
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -0,0 +1,172 @@
+package org.apache.kylin.cube.gridtable;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CuboidToGridTableMapping {
+
+ final private Cuboid cuboid;
+
+ private List<DataType> gtDataTypes;
+ private List<ImmutableBitSet> gtColBlocks;
+
+ private int nDimensions;
+ private Map<TblColRef, Integer> dim2gt;
+ private ImmutableBitSet gtPrimaryKey;
+
+ private int nMetrics;
+ private ListMultimap<FunctionDesc, Integer> metrics2gt; // because count distinct may have a holistic version
+
+ public CuboidToGridTableMapping(Cuboid cuboid) {
+ this.cuboid = cuboid;
+ init();
+ }
+
+ private void init() {
+ int gtColIdx = 0;
+ gtDataTypes = Lists.newArrayList();
+ gtColBlocks = Lists.newArrayList();
+
+ // dimensions
+ dim2gt = Maps.newHashMap();
+ BitSet pk = new BitSet();
+ for (TblColRef dimension : cuboid.getColumns()) {
+ gtDataTypes.add(dimension.getType());
+ dim2gt.put(dimension, gtColIdx);
+ pk.set(gtColIdx);
+ gtColIdx++;
+ }
+ gtPrimaryKey = new ImmutableBitSet(pk);
+ gtColBlocks.add(gtPrimaryKey);
+
+ nDimensions = gtColIdx;
+ assert nDimensions == cuboid.getColumns().size();
+
+ // metrics
+ metrics2gt = LinkedListMultimap.create();
+ for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ BitSet colBlock = new BitSet();
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ // Count distinct & holistic count distinct are equals() but different.
+ // Ensure the holistic version if exists is always the first.
+ FunctionDesc func = measure.getFunction();
+ if (func.isHolisticCountDistinct()) {
+ List<Integer> existing = metrics2gt.removeAll(func);
+ metrics2gt.put(func, gtColIdx);
+ metrics2gt.putAll(func, existing);
+ } else {
+ metrics2gt.put(func, gtColIdx);
+ }
+ gtDataTypes.add(func.getReturnDataType());
+ colBlock.set(gtColIdx);
+ gtColIdx++;
+ }
+ gtColBlocks.add(new ImmutableBitSet(colBlock));
+ }
+ }
+ nMetrics = gtColIdx - nDimensions;
+ assert nMetrics == cuboid.getCube().getMeasures().size();
+ }
+
+ public int getColumnCount() {
+ return nDimensions + nMetrics;
+ }
+
+ public int getDimensionCount() {
+ return nDimensions;
+ }
+
+ public int getMetricsCount() {
+ return nMetrics;
+ }
+
+ public DataType[] getDataTypes() {
+ return (DataType[]) gtDataTypes.toArray(new DataType[gtDataTypes.size()]);
+ }
+
+ public ImmutableBitSet getPrimaryKey() {
+ return gtPrimaryKey;
+ }
+
+ public ImmutableBitSet[] getColumnBlocks() {
+ return (ImmutableBitSet[]) gtColBlocks.toArray(new ImmutableBitSet[gtColBlocks.size()]);
+ }
+
+ public int getIndexOf(TblColRef dimension) {
+ Integer i = dim2gt.get(dimension);
+ return i == null ? -1 : i.intValue();
+ }
+
+ public int getIndexOf(FunctionDesc metric) {
+ List<Integer> list = metrics2gt.get(metric);
+ // normal case
+ if (list.size() == 1) {
+ return list.get(0);
+ }
+ // count distinct & its holistic version
+ else if (list.size() == 2) {
+ assert metric.isCountDistinct();
+ return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1);
+ }
+ // unexpected
+ else
+ return -1;
+ }
+
+ public List<TblColRef> getCuboidDimensionsInGTOrder() {
+ return cuboid.getColumns();
+ }
+
+ public Map<Integer, Integer> getDependentMetricsMap() {
+ Map<Integer, Integer> result = Maps.newHashMap();
+ List<MeasureDesc> measures = cuboid.getCube().getMeasures();
+ for (MeasureDesc child : measures) {
+ if (child.getDependentMeasureRef() != null) {
+ boolean ok = false;
+ for (MeasureDesc parent : measures) {
+ if (parent.getName().equals(child.getDependentMeasureRef())) {
+ int childIndex = getIndexOf(child.getFunction());
+ int parentIndex = getIndexOf(parent.getFunction());
+ result.put(childIndex, parentIndex);
+ ok = true;
+ break;
+ }
+ }
+ if (!ok)
+ throw new IllegalStateException("Cannot find dependent measure: " + child.getDependentMeasureRef());
+ }
+ }
+ return result.isEmpty() ? Collections.<Integer, Integer>emptyMap() : result;
+ }
+
+ public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
+ MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
+ int i = 0;
+ for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ for (MeasureDesc m : hbaseColDesc.getMeasures()) {
+ result[i++] = m;
+ }
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
new file mode 100644
index 0000000..518a8d2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An interface alike abstract class. Hold common tunable parameters and nothing more.
+ */
+abstract public class AbstractInMemCubeBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
+
+ final protected CubeDesc cubeDesc;
+ final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ protected int taskThreadCount = 4;
+ protected int reserveMemoryMB = 100;
+
+ public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ if(cubeDesc == null)
+ throw new NullPointerException();
+ if (dictionaryMap == null)
+ throw new IllegalArgumentException("dictionary cannot be null");
+
+ this.cubeDesc = cubeDesc;
+ this.dictionaryMap = dictionaryMap;
+ }
+
+ public void setConcurrentThreads(int n) {
+ this.taskThreadCount = n;
+ }
+
+ public void setReserveMemoryMB(int mb) {
+ this.reserveMemoryMB = mb;
+ }
+
+ public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ build(input, output);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+
+ protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ output.write(cuboidId, record);
+ }
+ scanner.close();
+ logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
new file mode 100644
index 0000000..9598a08
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A disk store that allows concurrent read and exclusive write.
+ */
+public class ConcurrentDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+ private static final boolean debug = true;
+
+ private static final int STREAM_BUFFER_SIZE = 8192;
+
+ final private GTInfo info;
+ final private Object lock;
+
+ final private File diskFile;
+ final private boolean delOnClose;
+
+ private Writer activeWriter;
+ private HashSet<Reader> activeReaders = new HashSet<Reader>();
+ private FileChannel writeChannel;
+ private FileChannel readChannel; // sharable across multi-threads
+
+ public ConcurrentDiskStore(GTInfo info) throws IOException {
+ this(info, File.createTempFile("ConcurrentDiskStore", ""), true);
+ }
+
+ public ConcurrentDiskStore(GTInfo info, File diskFile) throws IOException {
+ this(info, diskFile, false);
+ }
+
+ private ConcurrentDiskStore(GTInfo info, File diskFile, boolean delOnClose) throws IOException {
+ this.info = info;
+ this.lock = this;
+ this.diskFile = diskFile;
+ this.delOnClose = delOnClose;
+
+ // in case user forget to call close()
+ if (delOnClose)
+ diskFile.deleteOnExit();
+
+ if (debug)
+ logger.debug(this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return newWriter(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ throw new IllegalStateException("does not support append yet");
+ //return newWriter(diskFile.length());
+ }
+
+ private IGTStoreWriter newWriter(long startOffset) throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null || !activeReaders.isEmpty())
+ throw new IllegalStateException();
+
+ openWriteChannel(startOffset);
+ activeWriter = new Writer(startOffset);
+ return activeWriter;
+ }
+ }
+
+ private void closeWriter(Writer w) {
+ synchronized (lock) {
+ if (activeWriter != w)
+ throw new IllegalStateException();
+
+ activeWriter = null;
+ closeWriteChannel();
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return newReader();
+ }
+
+ private IGTStoreScanner newReader() throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null)
+ throw new IllegalStateException();
+
+ openReadChannel();
+ Reader r = new Reader(0);
+ activeReaders.add(r);
+ return r;
+ }
+ }
+
+ private void closeReader(Reader r) throws IOException {
+ synchronized (lock) {
+ if (activeReaders.contains(r) == false)
+ throw new IllegalStateException();
+
+ activeReaders.remove(r);
+ if (activeReaders.isEmpty())
+ closeReadChannel();
+ }
+ }
+
+ private class Reader implements IGTStoreScanner {
+ final DataInputStream din;
+ long fileLen;
+ long readOffset;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader(long startOffset) throws IOException {
+ this.fileLen = diskFile.length();
+ this.readOffset = startOffset;
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " read start @ " + readOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (available() <= 0)
+ return -1;
+
+ int lenToGo = Math.min(available(), len);
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n = readChannel.read(ByteBuffer.wrap(b, off, lenToGo), readOffset);
+
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ readOffset += n;
+ }
+ return nRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (fileLen - readOffset);
+ }
+ };
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ closeReader(this);
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " read end @ " + readOffset);
+ }
+ }
+
+ private class Writer implements IGTStoreWriter {
+ final DataOutputStream dout;
+ long writeOffset;
+
+ Writer(long startOffset) {
+ this.writeOffset = startOffset;
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " write start @ " + writeOffset);
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ while (length > 0) {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), writeOffset);
+ offset += n;
+ length -= n;
+ writeOffset += n;
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dout.close();
+ closeWriter(this);
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " write end @ " + writeOffset);
+ }
+ }
+
+ private void openWriteChannel(long startOffset) throws IOException {
+ if (startOffset > 0) { // TODO does not support append yet
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ }
+ }
+
+ private void closeWriteChannel() {
+ IOUtils.closeQuietly(writeChannel);
+ writeChannel = null;
+ }
+
+ private void openReadChannel() throws IOException {
+ if (readChannel == null) {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ }
+ }
+
+ private void closeReadChannel() throws IOException {
+ IOUtils.closeQuietly(readChannel);
+ readChannel = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null || !activeReaders.isEmpty())
+ throw new IllegalStateException();
+
+ if (delOnClose) {
+ diskFile.delete();
+ }
+
+ if (debug)
+ logger.debug(this + " closed");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ConcurrentDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
new file mode 100644
index 0000000..3d60321
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder.CuboidResult;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
+ */
+public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
+
+ private int splitRowThreshold = Integer.MAX_VALUE;
+ private int unitRows = 1000;
+
+ public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ super(cubeDesc, dictionaryMap);
+ }
+
+ public void setSplitRowThreshold(int rowThreshold) {
+ this.splitRowThreshold = rowThreshold;
+ this.unitRows = Math.min(unitRows, rowThreshold);
+ }
+
+ @Override
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ new BuildOnce().build(input, output);
+ }
+
+ private class BuildOnce {
+
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ final List<SplitThread> splits = new ArrayList<SplitThread>();
+ final Merger merger = new Merger();
+
+ long start = System.currentTimeMillis();
+ logger.info("Dogged Cube Build start");
+
+ try {
+ SplitThread last = null;
+ boolean eof = false;
+
+ while (!eof) {
+
+ if (last != null && shouldCutSplit(splits)) {
+ cutSplit(last);
+ last = null;
+ }
+
+ checkException(splits);
+
+ if (last == null) {
+ last = new SplitThread();
+ splits.add(last);
+ last.start();
+ logger.info("Split #" + splits.size() + " kickoff");
+ }
+
+ eof = feedSomeInput(input, last, unitRows);
+ }
+
+ for (SplitThread split : splits) {
+ split.join();
+ }
+ checkException(splits);
+ logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
+
+ merger.mergeAndOutput(splits, output);
+
+ } catch (Throwable e) {
+ logger.error("Dogged Cube Build error", e);
+ if (e instanceof Error)
+ throw (Error) e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new IOException(e);
+ } finally {
+ closeGirdTables(splits);
+ logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
+ ensureExit(splits);
+ logger.info("Dogged Cube Build return");
+ }
+ }
+
+ private void closeGirdTables(List<SplitThread> splits) {
+ for (SplitThread split : splits) {
+ if (split.buildResult != null) {
+ for (CuboidResult r : split.buildResult.values()) {
+ try {
+ r.table.close();
+ } catch (Throwable e) {
+ logger.error("Error closing grid table " + r.table, e);
+ }
+ }
+ }
+ }
+ }
+
+ private void ensureExit(List<SplitThread> splits) throws IOException {
+ try {
+ for (int i = 0; i < splits.size(); i++) {
+ SplitThread split = splits.get(i);
+ if (split.isAlive()) {
+ abort(splits);
+ }
+ }
+ } catch (Throwable e) {
+ logger.error("Dogged Cube Build error", e);
+ }
+ }
+
+ private void checkException(List<SplitThread> splits) throws IOException {
+ for (int i = 0; i < splits.size(); i++) {
+ SplitThread split = splits.get(i);
+ if (split.exception != null)
+ abort(splits);
+ }
+ }
+
+ private void abort(List<SplitThread> splits) throws IOException {
+ for (SplitThread split : splits) {
+ split.builder.abort();
+ }
+
+ ArrayList<Throwable> errors = new ArrayList<Throwable>();
+ for (SplitThread split : splits) {
+ try {
+ split.join();
+ } catch (InterruptedException e) {
+ errors.add(e);
+ }
+ if (split.exception != null)
+ errors.add(split.exception);
+ }
+
+ if (errors.isEmpty()) {
+ return;
+ } else if (errors.size() == 1) {
+ Throwable t = errors.get(0);
+ if (t instanceof IOException)
+ throw (IOException) t;
+ else
+ throw new IOException(t);
+ } else {
+ for (Throwable t : errors)
+ logger.error("Exception during in-mem cube build", t);
+ throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+ }
+ }
+
+ private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
+ try {
+ int i = 0;
+ while (i < n) {
+ List<String> record = input.take();
+ i++;
+
+ while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
+ if (split.exception != null)
+ return true; // got some error
+ }
+ split.inputRowCount++;
+
+ if (record == null || record.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void cutSplit(SplitThread last) {
+ try {
+ // signal the end of input
+ while (last.isAlive()) {
+ if (last.inputQueue.offer(Collections.<String> emptyList())) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // wait cuboid build done
+ while (last.isAlive()) {
+ if (last.builder.isAllCuboidDone()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean shouldCutSplit(List<SplitThread> splits) {
+ int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
+ int nSplit = splits.size();
+ long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
+
+ logger.debug(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
+
+ return splitRowCount >= splitRowThreshold || systemAvailMB <= reserveMemoryMB;
+ }
+ }
+
+ private class SplitThread extends Thread {
+ final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
+ final InMemCubeBuilder builder;
+
+ ConcurrentNavigableMap<Long, CuboidResult> buildResult;
+ long inputRowCount = 0;
+ RuntimeException exception;
+
+ public SplitThread() {
+ this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
+ this.builder.setConcurrentThreads(taskThreadCount);
+ this.builder.setReserveMemoryMB(reserveMemoryMB);
+ }
+
+ @Override
+ public void run() {
+ try {
+ buildResult = builder.build(inputQueue);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ this.exception = (RuntimeException) e;
+ else
+ this.exception = new RuntimeException(e);
+ }
+ }
+ }
+
+ private class Merger {
+
+ MeasureAggregators reuseAggrs;
+ Object[] reuseMetricsArray;
+ ByteArray reuseMetricsSpace;
+
+ long lastCuboidColumnCount;
+ ImmutableBitSet lastMetricsColumns;
+
+ Merger() {
+ MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
+ reuseAggrs = new MeasureAggregators(measures);
+ reuseMetricsArray = new Object[measures.length];
+ }
+
+ public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
+ if (splits.size() == 1) {
+ for (CuboidResult cuboidResult : splits.get(0).buildResult.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ return;
+ }
+
+ LinkedList<MergeSlot> open = Lists.newLinkedList();
+ for (SplitThread split : splits) {
+ open.add(new MergeSlot(split));
+ }
+
+ PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
+
+ while (true) {
+ // ready records in open slots and add to heap
+ while (!open.isEmpty()) {
+ MergeSlot slot = open.removeFirst();
+ if (slot.fetchNext()) {
+ heap.add(slot);
+ }
+ }
+
+ // find the smallest on heap
+ MergeSlot smallest = heap.poll();
+ if (smallest == null)
+ break;
+ open.add(smallest);
+
+ // merge with slots having the same key
+ if (smallest.isSameKey(heap.peek())) {
+ Object[] metrics = getMetricsValues(smallest.currentRecord);
+ reuseAggrs.reset();
+ reuseAggrs.aggregate(metrics);
+ do {
+ MergeSlot slot = heap.poll();
+ open.add(slot);
+ metrics = getMetricsValues(slot.currentRecord);
+ reuseAggrs.aggregate(metrics);
+ } while (smallest.isSameKey(heap.peek()));
+
+ reuseAggrs.collectStates(metrics);
+ setMetricsValues(smallest.currentRecord, metrics);
+ }
+
+ output.write(smallest.currentCuboidId, smallest.currentRecord);
+ }
+ }
+
+ private void setMetricsValues(GTRecord record, Object[] metricsValues) {
+ ImmutableBitSet metrics = getMetricsColumns(record);
+
+ if (reuseMetricsSpace == null) {
+ reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
+ }
+
+ record.setValues(metrics, reuseMetricsSpace, metricsValues);
+ }
+
+ private Object[] getMetricsValues(GTRecord record) {
+ ImmutableBitSet metrics = getMetricsColumns(record);
+ return record.getValues(metrics, reuseMetricsArray);
+ }
+
+ private ImmutableBitSet getMetricsColumns(GTRecord record) {
+ // metrics columns always come after dimension columns
+ if (lastCuboidColumnCount == record.getInfo().getColumnCount())
+ return lastMetricsColumns;
+
+ int to = record.getInfo().getColumnCount();
+ int from = to - reuseMetricsArray.length;
+ lastCuboidColumnCount = record.getInfo().getColumnCount();
+ lastMetricsColumns = new ImmutableBitSet(from, to);
+ return lastMetricsColumns;
+ }
+ }
+
+ private static class MergeSlot implements Comparable<MergeSlot> {
+
+ final Iterator<CuboidResult> cuboidIterator;
+ IGTScanner scanner;
+ Iterator<GTRecord> recordIterator;
+
+ long currentCuboidId;
+ GTRecord currentRecord;
+
+ public MergeSlot(SplitThread split) {
+ cuboidIterator = split.buildResult.values().iterator();
+ }
+
+ public boolean fetchNext() throws IOException {
+ if (recordIterator == null) {
+ if (cuboidIterator.hasNext()) {
+ CuboidResult cuboid = cuboidIterator.next();
+ currentCuboidId = cuboid.cuboidId;
+ scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+ recordIterator = scanner.iterator();
+ } else {
+ return false;
+ }
+ }
+
+ if (recordIterator.hasNext()) {
+ currentRecord = recordIterator.next();
+ return true;
+ } else {
+ scanner.close();
+ recordIterator = null;
+ return fetchNext();
+ }
+ }
+
+ @Override
+ public int compareTo(MergeSlot o) {
+ long cuboidComp = this.currentCuboidId - o.currentCuboidId;
+ if (cuboidComp != 0)
+ return cuboidComp < 0 ? -1 : 1;
+
+ // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
+ ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
+ for (int i = 0; i < pk.trueBitCount(); i++) {
+ int c = pk.trueBitAt(i);
+ int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
+ if (comp != 0)
+ return comp;
+ }
+ return 0;
+ }
+
+ public boolean isSameKey(MergeSlot o) {
+ if (o == null)
+ return false;
+ else
+ return this.compareTo(o) == 0;
+ }
+
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
new file mode 100644
index 0000000..1afb665
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public interface ICuboidWriter {
+ void write(long cuboidId, GTRecord record) throws IOException;
+}