You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/08 12:16:12 UTC
incubator-kylin git commit: KYLIN-876 Refactor CubingJobBuilder to
decouple individual steps
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 ba9a8a143 -> e38ddb780
KYLIN-876 Refactor CubingJobBuilder to decouple individual steps
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e38ddb78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e38ddb78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e38ddb78
Branch: refs/heads/0.8
Commit: e38ddb78081dffefe19c563d74c2a279804f0e07
Parents: ba9a8a1
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Jul 7 18:24:26 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 8 18:15:07 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/IBatchCubeBuilder.java | 32 +++++++
.../org/apache/kylin/engine/IBuildEngine.java | 24 +++++
.../org/apache/kylin/engine/mr/IMRInput.java | 25 +++++
.../kylin/engine/mr/IMRJobFlowParticipant.java | 34 +++++++
.../org/apache/kylin/engine/mr/IMROutput.java | 25 +++++
.../apache/kylin/engine/mr/MRBuildEngine.java | 31 +++++++
.../apache/kylin/engine/mr/MRCubeBuilder.java | 45 +++++++++
.../kylin/job/common/MapReduceExecutable.java | 36 ++++++-
.../org/apache/kylin/job/cube/CubingJob.java | 36 ++++++-
.../apache/kylin/job/cube/CubingJobBuilder.java | 98 ++++++++------------
.../job/cube/UpdateCubeInfoAfterBuildStep.java | 64 +++----------
.../job/cube/UpdateCubeInfoAfterMergeStep.java | 30 ++----
.../kylin/job/execution/ChainedExecutable.java | 4 +-
.../job/execution/DefaultChainedExecutable.java | 1 +
.../kylin/job/manager/ExecutableManager.java | 8 +-
.../org/apache/kylin/source/ITableSource.java | 24 +++++
.../apache/kylin/source/hive/HiveMRInput.java | 32 +++++++
.../kylin/source/hive/HiveTableSource.java | 36 +++++++
.../java/org/apache/kylin/storage/IStorage.java | 28 ++++++
.../kylin/storage/hbase/HBaseMROutput.java | 32 +++++++
.../kylin/storage/hbase/HBaseStorage.java | 44 +++++++++
.../kylin/query/enumerator/OLAPEnumerator.java | 4 +-
.../kylin/storage/ICachableStorageEngine.java | 2 +-
.../apache/kylin/storage/IStorageEngine.java | 36 -------
.../org/apache/kylin/storage/IStorageQuery.java | 36 +++++++
.../kylin/storage/StorageEngineFactory.java | 4 +-
.../AbstractCacheFledgedStorageEngine.java | 4 +-
.../storage/hybrid/HybridStorageEngine.java | 8 +-
.../kylin/storage/test/ITStorageTest.java | 4 +-
29 files changed, 593 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
new file mode 100644
index 0000000..2f5344a
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.ChainedExecutable;
+
+public interface IBatchCubeBuilder {
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public ChainedExecutable createBuildJob(CubeSegment newSegment);
+
+ /** Merge multiple small segments into a big one. */
+ public ChainedExecutable createMergeJob(CubeSegment mergeSegment);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java b/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
new file mode 100644
index 0000000..e54429f
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+public interface IBuildEngine {
+
+ public IBatchCubeBuilder createBatchCubeBuilder();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
new file mode 100644
index 0000000..3097a4b
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+public interface IMRInput {
+
+ public IMRJobFlowParticipant createBuildFlowParticipant();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
new file mode 100644
index 0000000..6a94920
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public interface IMRJobFlowParticipant {
+
+ public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
+
+ public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
+
+ public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
+
+ public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
new file mode 100644
index 0000000..697957e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+public interface IMROutput {
+
+ public IMRJobFlowParticipant createBuildFlowParticipant();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
new file mode 100644
index 0000000..80b9009
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.engine.IBatchCubeBuilder;
+import org.apache.kylin.engine.IBuildEngine;
+
+public class MRBuildEngine implements IBuildEngine {
+
+ @Override
+ public IBatchCubeBuilder createBatchCubeBuilder() {
+ return new MRCubeBuilder();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
new file mode 100644
index 0000000..6bf0423
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubeBuilder;
+import org.apache.kylin.job.execution.ChainedExecutable;
+
+public class MRCubeBuilder implements IBatchCubeBuilder {
+
+ @Override
+ public ChainedExecutable createBuildJob(CubeSegment newSegment) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ChainedExecutable createMergeJob(CubeSegment mergeSegment) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /** Build a new segment and merge it with existing segment into one step. Deprecated, only needed by Holistic Distinct Count. */
+ public ChainedExecutable createBuildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) {
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index af6cb9c..f8eab6c 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -53,6 +53,8 @@ public class MapReduceExecutable extends AbstractExecutable {
private static final String KEY_MR_JOB = "MR_JOB_CLASS";
private static final String KEY_PARAMS = "MR_JOB_PARAMS";
+ private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
+
public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
public MapReduceExecutable() {
@@ -145,11 +147,8 @@ public class MapReduceExecutable extends AbstractExecutable {
status = newStatus;
executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
if (status.isComplete()) {
- hadoopCmdOutput.updateJobCounter();
final Map<String, String> info = hadoopCmdOutput.getInfo();
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+ readCounters(hadoopCmdOutput, info);
executableManager.addJobInfo(getId(), info);
if (status == JobStepStatusEnum.FINISHED) {
@@ -173,6 +172,27 @@ public class MapReduceExecutable extends AbstractExecutable {
}
}
+ private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
+ hadoopCmdOutput.updateJobCounter();
+ info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+
+ String saveAs = getParam(KEY_COUNTER_SAVEAS);
+ if (saveAs != null) {
+ String[] saveAsNames = saveAs.split(",");
+ saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
+ saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+ saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
+ }
+ }
+
+ private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+ if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+ info.put(saveAsNames[i].trim(), counter);
+ }
+ }
+
private String getRestStatusCheckUrl(Job job, KylinConfig config) {
final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
if (yarnStatusCheckUrl != null) {
@@ -216,5 +236,13 @@ public class MapReduceExecutable extends AbstractExecutable {
public String getMapReduceParams() {
return getParam(KEY_PARAMS);
}
+
+ public String getCounterSaveAs() {
+ return getParam(KEY_COUNTER_SAVEAS);
+ }
+
+ public void setCounterSaveAs(String value) {
+ setParam(KEY_COUNTER_SAVEAS, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
index e8bfa68..be52040 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
@@ -39,14 +39,18 @@ import org.apache.kylin.job.execution.Output;
*/
public class CubingJob extends DefaultChainedExecutable {
- public CubingJob() {
- super();
- }
-
+ // KEYS of Output.extraInfo map, info passed across job steps
+ public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+ public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
+ public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
+ public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+
private static final String CUBE_INSTANCE_NAME = "cubeName";
private static final String SEGMENT_ID = "segmentId";
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+ public CubingJob() {
+ super();
+ }
void setCubeName(String name) {
setParam(CUBE_INSTANCE_NAME, name);
@@ -136,4 +140,26 @@ public class CubingJob extends DefaultChainedExecutable {
public void setMapReduceWaitTime(long t) {
addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
}
+
+ public long findSourceRecordCount() {
+ return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
+ }
+
+ public long findSourceSizeBytes() {
+ return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
+ }
+
+ public long findCubeSizeBytes() {
+ return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+ }
+
+ private String findExtraInfo(String key, String dft) {
+ for (AbstractExecutable child : getTasks()) {
+ Output output = executableManager.getOutput(child.getId());
+ String value = output.getExtra().get(key);
+ if (value != null)
+ return value;
+ }
+ return dft;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 1e73436..21d1f8d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -18,10 +18,13 @@
package org.apache.kylin.job.cube;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.job.AbstractJobBuilder;
@@ -30,7 +33,12 @@ import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.cube.*;
+import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
+import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
+import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
+import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
@@ -39,11 +47,8 @@ import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
*/
@@ -65,22 +70,17 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
// cubing
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result);
- String intermediateHiveTableStepId = twoSteps.getFirst().getId();
- String baseCuboidStepId = twoSteps.getSecond().getId();
-
+ addCubingSteps(seg, cuboidRootPath, result);
if (this.inMemoryCubing()) {
- result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, baseCuboidStepId, jobId));
+ result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
} else {
// convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result);
-
+ addHTableSteps(seg, cuboidRootPath, result);
// update cube info
- result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
+ result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
}
-
final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable));
@@ -97,12 +97,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
// cubing the incremental segment
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result);
- final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
- final String baseCuboidStepId = twoSteps.getSecond().getId();
+ addCubingSteps(appendSegment, appendRootPath, result);
// update the append segment info
- result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, intermediateHiveTableStepId, baseCuboidStepId, null, jobId));
+ result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, jobId));
List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
@@ -118,23 +116,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
mergingCuboidPaths.add(getPathToMerge(merging));
}
- AbstractExecutable convertCuboidToHfileStep;
-
if (this.inMemoryCubing()) {
// merge from HTable
- convertCuboidToHfileStep = addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
-
+ addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
// bulk load step
result.addTask(createBulkLoadStep(mergeSegment, result.getId()));
} else {
// merge cuboid
addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
// convert htable
- convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
+ addHTableSteps(mergeSegment, mergedRootPath, result);
}
// update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, jobId));
result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null));
@@ -159,23 +154,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
mergingHTables.add(merging.getStorageLocationIdentifier());
}
-
- AbstractExecutable convertCuboidToHfileStep;
if (this.inMemoryCubing()) {
// merge from HTable
- convertCuboidToHfileStep = addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
-
+ addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
// bulk load step
result.addTask(createBulkLoadStep(seg, result.getId()));
} else {
// merge cuboid
addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
// convert htable
- convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
+ addHTableSteps(seg, mergedCuboidPath, result);
}
// update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+ result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, jobId));
result.addTask(createGarbageCollectionStep(seg, mergingHTables, null));
@@ -191,7 +183,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
- AbstractExecutable addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+ void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
@@ -203,13 +195,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
String formattedTables = StringUtils.join(mergingHTables, ",");
String hFilePath = getHFilePath(seg, result.getId());
- MapReduceExecutable writeHFileStep = createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath);
- result.addTask(writeHFileStep);
-
- return writeHFileStep;
+ result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath));
}
- Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+ void addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
@@ -223,12 +212,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
result.addTask(intermediateHiveTableStep);
result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
- MapReduceExecutable baseCuboidStep = null;
+
if (!inMemoryCubing()) {
result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
// base cuboid step
- baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath);
- result.addTask(baseCuboidStep);
+ result.addTask(createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath));
// n dim cuboid steps
for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
@@ -240,16 +228,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createSaveStatisticsStep(seg, getStatisticsPath(seg, jobId)));
result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
result.addTask(createCreateHTableStep(seg));
- baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
- result.addTask(baseCuboidStep);
+ result.addTask(createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId()));
// bulk load step
result.addTask(createBulkLoadStep(seg, result.getId()));
}
-
- return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
}
- AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+ void addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
final String jobId = result.getId();
final String cuboidPath = cuboidRootPath + "*";
@@ -257,12 +242,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// create htable step
result.addTask(createCreateHTableStep(seg));
// generate hfiles step
- final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
- result.addTask(convertCuboidToHfileStep);
+ result.addTask(createConvertCuboidToHfileStep(seg, cuboidPath, jobId));
// bulk load step
result.addTask(createBulkLoadStep(seg, jobId));
-
- return convertCuboidToHfileStep;
}
private CubingJob initialJob(CubeSegment seg, String type) {
@@ -385,6 +367,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
return baseCuboidStep;
}
@@ -406,6 +389,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
@@ -475,6 +459,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
createHFilesStep.setMapReduceParams(cmd.toString());
createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+ createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
return createHFilesStep;
}
@@ -495,14 +480,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
- private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) {
+ private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String jobId) {
final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId);
- updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId);
- updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId);
updateCubeInfoStep.setCubingJobId(jobId);
return updateCubeInfoStep;
}
@@ -570,16 +552,16 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
return mergeCuboidDataStep;
}
- private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
+ private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String jobId) {
UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
result.setCubeName(seg.getCubeInstance().getName());
result.setSegmentId(seg.getUuid());
result.setMergingSegmentIds(mergingSegmentIds);
- result.setConvertToHFileStepId(convertToHFileStepId);
result.setCubingJobId(jobId);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
index 617cdb5..8ad1f7a 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -18,21 +18,16 @@
package org.apache.kylin.job.cube;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.cube.CubeInstance;
+import java.io.IOException;
+
import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-import java.io.IOException;
/**
*/
@@ -40,9 +35,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
private static final String SEGMENT_ID = "segmentId";
private static final String CUBE_NAME = "cubeName";
- private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId";
- private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId";
- private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId";
private static final String CUBING_JOB_ID = "cubingJobId";
public UpdateCubeInfoAfterBuildStep() {
@@ -65,26 +57,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
return getParam(SEGMENT_ID);
}
- public void setConvertToHFileStepId(String id) {
- setParam(CONVERT_TO_HFILE_STEP_ID, id);
- }
-
- private String getConvertToHfileStepId() {
- return getParam(CONVERT_TO_HFILE_STEP_ID);
- }
-
- public void setBaseCuboidStepId(String id) {
- setParam(BASE_CUBOID_STEP_ID, id);
- }
-
- private String getBaseCuboidStepId() {
- return getParam(BASE_CUBOID_STEP_ID);
- }
-
- public void setCreateFlatTableStepId(String id) {
- setParam(CREATE_FLAT_TABLE_STEP_ID, id);
- }
-
public void setCubingJobId(String id) {
setParam(CUBING_JOB_ID, id);
}
@@ -98,32 +70,18 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = cubeManager.getCube(getCubeName());
final CubeSegment segment = cube.getSegmentById(getSegmentId());
-
- Output baseCuboidOutput = executableManager.getOutput(getBaseCuboidStepId());
- String sourceRecordsCount = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT);
- Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsCount), "Can't get cube source record count.");
- long sourceCount = Long.parseLong(sourceRecordsCount);
-
- String sourceRecordsSize = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE);
- Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsSize), "Can't get cube source record size.");
- long sourceSize = Long.parseLong(sourceRecordsSize);
-
- long size = 0;
- boolean segmentReady = true;
- if (!StringUtils.isBlank(getConvertToHfileStepId())) {
- String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
- Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
- size = Long.parseLong(cubeSizeString) / 1024;
- } else {
- // for the increment & merge case, the increment segment is only built to be merged, won't serve query by itself
- segmentReady = false;
- }
+
+ CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+ long sourceCount = cubingJob.findSourceRecordCount();
+ long sourceSizeBytes = cubingJob.findSourceSizeBytes();
+ long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+ boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
segment.setLastBuildJobID(getCubingJobId());
segment.setLastBuildTime(System.currentTimeMillis());
- segment.setSizeKB(size);
+ segment.setSizeKB(cubeSizeBytes / 1024);
segment.setInputRecords(sourceCount);
- segment.setInputRecordsSize(sourceSize);
+ segment.setInputRecordsSize(sourceSizeBytes);
try {
if (segmentReady) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
index d50bd6a..47e5a31 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -18,23 +18,22 @@
package org.apache.kylin.job.cube;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
*/
@@ -43,7 +42,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
private static final String CUBE_NAME = "cubeName";
private static final String SEGMENT_ID = "segmentId";
private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
- private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId";
private static final String CUBING_JOB_ID = "cubingJobId";
private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -60,9 +58,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
if (mergedSegment == null) {
return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
}
- String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
- Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
- long cubeSize = Long.parseLong(cubeSizeString) / 1024;
+
+ CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+ long cubeSizeBytes = cubingJob.findCubeSizeBytes();
// collect source statistics
List<String> mergingSegmentIds = getMergingSegmentIds();
@@ -78,7 +76,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
// update segment info
- mergedSegment.setSizeKB(cubeSize);
+ mergedSegment.setSizeKB(cubeSizeBytes / 1024);
mergedSegment.setInputRecords(sourceCount);
mergedSegment.setInputRecordsSize(sourceSize);
mergedSegment.setLastBuildJobID(getCubingJobId());
@@ -127,14 +125,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
}
- public void setConvertToHFileStepId(String id) {
- setParam(CONVERT_TO_HFILE_STEP_ID, id);
- }
-
- private String getConvertToHfileStepId() {
- return getParam(CONVERT_TO_HFILE_STEP_ID);
- }
-
public void setCubingJobId(String id) {
setParam(CUBING_JOB_ID, id);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
index f352ead..d5a7aae 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
@@ -24,6 +24,8 @@ import java.util.List;
*/
public interface ChainedExecutable extends Executable {
- List<? extends Executable> getTasks();
+ List<? extends AbstractExecutable> getTasks();
+
+ void addTask(AbstractExecutable executable);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 19b0d74..6443762 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -114,6 +114,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return null;
}
+ @Override
public void addTask(AbstractExecutable executable) {
executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
this.subTasks.add(executable);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index a2b310b..05f8c8e 100644
--- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -280,9 +280,9 @@ public class ExecutableManager {
result.setUuid(executable.getId());
result.setType(executable.getClass().getName());
result.setParams(executable.getParams());
- if (executable instanceof DefaultChainedExecutable) {
+ if (executable instanceof ChainedExecutable) {
List<ExecutablePO> tasks = Lists.newArrayList();
- for (AbstractExecutable task : ((DefaultChainedExecutable) executable).getTasks()) {
+ for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
tasks.add(parse(task));
}
result.setTasks(tasks);
@@ -304,9 +304,9 @@ public class ExecutableManager {
result.setParams(executablePO.getParams());
List<ExecutablePO> tasks = executablePO.getTasks();
if (tasks != null && !tasks.isEmpty()) {
- Preconditions.checkArgument(result instanceof DefaultChainedExecutable);
+ Preconditions.checkArgument(result instanceof ChainedExecutable);
for (ExecutablePO subTask: tasks) {
- ((DefaultChainedExecutable) result).addTask(parseTo(subTask));
+ ((ChainedExecutable) result).addTask(parseTo(subTask));
}
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/ITableSource.java b/job/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..ae1dccc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+public interface ITableSource {
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
new file mode 100644
index 0000000..a0cd62e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.engine.mr.IMRInput;
+
+public class HiveMRInput implements IMRInput {
+
+ @Override
+ public IMRJobFlowParticipant createBuildFlowParticipant() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
new file mode 100644
index 0000000..4265519
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.source.ITableSource;
+
+public class HiveTableSource implements ITableSource {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == IMRInput.class) {
+ return (I) new HiveMRInput();
+ } else {
+ throw new RuntimeException("Cannot adapt to " + engineInterface);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/IStorage.java b/job/src/main/java/org/apache/kylin/storage/IStorage.java
new file mode 100644
index 0000000..89b96e9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+
+public interface IStorage {
+
+ public IStorageQuery createStorageQuery(IRealization realization);
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
new file mode 100644
index 0000000..00c2eef
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.engine.mr.IMROutput;
+
+public class HBaseMROutput implements IMROutput {
+
+ @Override
+ public IMRJobFlowParticipant createBuildFlowParticipant() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
new file mode 100644
index 0000000..7951b44
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+
+public class HBaseStorage implements IStorage {
+
+ @Override
+ public IStorageQuery createStorageQuery(IRealization realization) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == IMROutput.class) {
+ return (I) new HBaseMROutput();
+ } else {
+ throw new RuntimeException("Cannot adapt to " + engineInterface);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index eaf668e..e938cbd 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -31,7 +31,7 @@ import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageEngineFactory;
import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
import org.slf4j.Logger;
@@ -111,7 +111,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
olapContext.resetSQLDigest();
// query storage engine
- IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
+ IStorageQuery storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
ITupleIterator iterator = storageEngine.search(olapContext.storageContext, olapContext.getSQLDigest(), olapContext.returnTupleInfo);
if (logger.isDebugEnabled()) {
logger.debug("return TupleIterator...");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
index 90c5c95..1fcd9b3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
@@ -4,7 +4,7 @@ import com.google.common.collect.Range;
/**
*/
-public interface ICachableStorageEngine extends IStorageEngine {
+public interface ICachableStorageEngine extends IStorageQuery {
/**
*
* being dynamic => getVolatilePeriod() return not null
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
deleted file mode 100644
index 46a94ca..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-
-/**
- *
- * @author xjiang
- *
- */
-public interface IStorageEngine {
-
- ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
new file mode 100644
index 0000000..f090ebb
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public interface IStorageQuery {
+
+ ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 0d720ab..3afcd32 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -42,7 +42,7 @@ import com.google.common.base.Preconditions;
public class StorageEngineFactory {
private static boolean allowStorageLayerCache = true;
- public static IStorageEngine getStorageEngine(IRealization realization) {
+ public static IStorageQuery getStorageEngine(IRealization realization) {
if (realization.getType() == RealizationType.INVERTED_INDEX) {
ICachableStorageEngine ret = new InvertedIndexStorageEngine((IIInstance) realization);
@@ -63,7 +63,7 @@ public class StorageEngineFactory {
}
}
- private static IStorageEngine wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
+ private static IStorageQuery wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
if (underlyingStorageEngine.isDynamic()) {
return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 422e335..34a8066 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -11,13 +11,13 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.apache.kylin.metadata.realization.StreamSQLDigest;
import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
-public abstract class AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
+public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery, TeeTupleItrListener {
private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
private static final String storageCacheTemplate = "StorageCache";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 3698688..3243698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -5,7 +5,7 @@ import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageEngineFactory;
import org.apache.kylin.storage.tuple.TupleInfo;
@@ -14,14 +14,14 @@ import java.util.List;
/**
*/
-public class HybridStorageEngine implements IStorageEngine {
+public class HybridStorageEngine implements IStorageQuery {
private IRealization[] realizations;
- private IStorageEngine[] storageEngines;
+ private IStorageQuery[] storageEngines;
public HybridStorageEngine(HybridInstance hybridInstance) {
this.realizations = hybridInstance.getRealizations();
- storageEngines = new IStorageEngine[realizations.length];
+ storageEngines = new IStorageQuery[realizations.length];
for (int i = 0; i < realizations.length; i++) {
storageEngines[i] = StorageEngineFactory.getStorageEngine(realizations[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
index 1e32ef5..2d75535 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageEngineFactory;
import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue;
public class ITStorageTest extends HBaseMetadataTestCase {
- private IStorageEngine storageEngine;
+ private IStorageQuery storageEngine;
private CubeInstance cube;
private StorageContext context;