You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:20:05 UTC
[48/50] [abbrv] kylin git commit: Merge commit
'5f2eff68d80ea6264d7590e14c052114c3cd6b74'
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
----------------------------------------------------------------------
diff --cc core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
index 0000000,904c4bd..dd18c91
mode 000000,100644..100644
--- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@@ -1,0 -1,142 +1,142 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.job.metrics;
+
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.metrics.MetricsManager;
+ import org.apache.kylin.metrics.lib.impl.RecordEvent;
+ import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+ import org.apache.kylin.metrics.property.JobPropertyEnum;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class JobMetricsFacade {
+ private static final Logger logger = LoggerFactory.getLogger(JobMetricsFacade.class);
+
+ public static void updateMetrics(JobStatisticsResult jobStats) {
+ if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) {
+ return;
+ }
+ /**
+ * report job related metrics
+ */
+ RecordEvent metricsEvent;
+ if (jobStats.throwable == null) {
+ metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
+ setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId,
+ jobStats.jobType, jobStats.cubingType);
+ setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, jobStats.buildDuration,
+ jobStats.waitResourceTime, jobStats.perBytesTimeCost, //
+ jobStats.dColumnDistinct, jobStats.dDictBuilding, jobStats.dCubingInmem, jobStats.dHfileConvert);
+ } else {
+ metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
+ setJobExceptionWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId,
+ jobStats.jobType, jobStats.cubingType, //
+ jobStats.throwable.getClass());
+ }
+ MetricsManager.getInstance().update(metricsEvent);
+ }
+
+ private static void setJobWrapper(RecordEvent metricsEvent, String user, String projectName, String cubeName,
+ String jobId, String jobType, String cubingType) {
+ metricsEvent.put(JobPropertyEnum.USER.toString(), user);
+ metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+ metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+ metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+ metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+ metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
+ }
+
+ private static void setJobStats(RecordEvent metricsEvent, long tableSize, long cubeSize, long buildDuration,
+ long waitResourceTime, double perBytesTimeCost, long dColumnDistinct, long dDictBuilding, long dCubingInmem,
+ long dHfileConvert) {
+ metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
+ metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
+ metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
+ metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
+ metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost);
+ metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct);
+ metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding);
+ metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
+ metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
+ }
+
+ private static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String user,
+ String projectName, String cubeName, String jobId, String jobType, String cubingType,
+ Class<T> throwableClass) {
+ setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, jobType, cubingType);
+ metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwableClass.getName());
+ }
+
+ public static class JobStatisticsResult {
+ // dimensions
+ private String user;
+ private String projectName;
+ private String cubeName;
+ private String jobId;
+ private String jobType;
+ private String cubingType;
+
+ // statistics
+ private long tableSize;
+ private long cubeSize;
+ private long buildDuration;
+ private long waitResourceTime;
+ private double perBytesTimeCost;
+
+ // step statistics
+ private long dColumnDistinct = 0L;
+ private long dDictBuilding = 0L;
+ private long dCubingInmem = 0L;
+ private long dHfileConvert = 0L;
+
+ // exception
+ private Throwable throwable;
+
+ public void setWrapper(String user, String projectName, String cubeName, String jobId, String jobType,
+ String cubingType) {
+ this.user = user;
- this.projectName = projectName;
++ this.projectName = projectName == null ? null : projectName.toUpperCase();
+ this.cubeName = cubeName;
+ this.jobId = jobId;
+ this.jobType = jobType;
+ this.cubingType = cubingType;
+ }
+
+ public void setJobStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime,
+ double perBytesTimeCost) {
+ this.tableSize = tableSize;
+ this.cubeSize = cubeSize;
+ this.buildDuration = buildDuration;
+ this.waitResourceTime = waitResourceTime;
+ this.perBytesTimeCost = perBytesTimeCost;
+ }
+
+ public void setJobStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
+ this.dColumnDistinct = dColumnDistinct;
+ this.dDictBuilding = dDictBuilding;
+ this.dCubingInmem = dCubingInmem;
+ this.dHfileConvert = dHfileConvert;
+ }
+
+ public void setJobException(Throwable throwable) {
+ this.throwable = throwable;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
----------------------------------------------------------------------
diff --cc core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
index fd17370,0000000..ee29b13
mode 100644,000000..100644
--- a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
@@@ -1,46 -1,0 +1,46 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+/**
+ */
+public class FiveSecondSucceedTestExecutable extends BaseTestExecutable {
+
+ public FiveSecondSucceedTestExecutable() {
+ super();
+ }
+
+ public FiveSecondSucceedTestExecutable(int sleepTime) {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++ return ExecuteResult.createSucceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
----------------------------------------------------------------------
diff --cc core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
index 0000000,61b1742..f656c44
mode 000000,100644..100644
--- a/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
@@@ -1,0 -1,50 +1,50 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.job;
+
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.job.execution.ExecutableContext;
+ import org.apache.kylin.job.execution.ExecuteResult;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ */
+ public class RetryableTestExecutable extends BaseTestExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(RetryableTestExecutable.class);
+
+ public RetryableTestExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) {
+ logger.debug("run retryable exception test. ");
+ String[] exceptions = KylinConfig.getInstanceFromEnv().getJobRetryExceptions();
+ Throwable ex = null;
- if (exceptions != null && exceptions[0] != null) {
++ if (exceptions != null && exceptions.length > 0) {
+ try {
+ ex = (Throwable) Class.forName(exceptions[0]).newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return new ExecuteResult(ExecuteResult.State.ERROR, null, ex);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
----------------------------------------------------------------------
diff --cc core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
index 89057e6,0000000..1826850
mode 100644,000000..100644
--- a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
@@@ -1,39 -1,0 +1,39 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class RunningTestExecutable extends BaseTestExecutable {
+
+ public RunningTestExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++ return ExecuteResult.createSucceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --cc core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index b1fc544,badd483..d1b7d96
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@@ -31,9 -30,7 +31,10 @@@ import org.apache.kylin.common.KylinCon
import org.apache.kylin.job.BaseTestExecutable;
import org.apache.kylin.job.ErrorTestExecutable;
import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.FiveSecondSucceedTestExecutable;
+import org.apache.kylin.job.NoErrorStatusExecutable;
+ import org.apache.kylin.job.RetryableTestExecutable;
+import org.apache.kylin.job.RunningTestExecutable;
import org.apache.kylin.job.SelfStopExecutable;
import org.apache.kylin.job.SucceedTestExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@@ -174,56 -148,18 +175,71 @@@ public class DefaultSchedulerTest exten
}
@Test
+ public void tesMetaStoreRecover() throws Exception {
+ logger.info("tesMetaStoreRecover");
+ NoErrorStatusExecutable job = new NoErrorStatusExecutable();
+ ErrorTestExecutable task = new ErrorTestExecutable();
+ job.addTask(task);
+ execMgr.addJob(job);
+ Thread.sleep(2000);
+ runningJobToError(job.getId());
+ Thread.sleep(2000);
+ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
+ }
+
+ @Test
+ public void testSchedulerStop() throws Exception {
+ logger.info("testSchedulerStop");
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("too long wait time");
+
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
+ job.addTask(task1);
+ execMgr.addJob(job);
+
+ //sleep 3s to make sure SucceedTestExecutable is running
+ Thread.sleep(3000);
+ //scheduler failed due to some reason
+ scheduler.shutdown();
+
+ waitForJobFinish(job.getId(), 6000);
+ }
+
+ @Test
+ public void testSchedulerRestart() throws Exception {
+ logger.info("testSchedulerRestart");
+
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
+ job.addTask(task1);
+ execMgr.addJob(job);
+
+ //sleep 3s to make sure SucceedTestExecutable is running
+ Thread.sleep(3000);
+ //scheduler failed due to some reason
+ scheduler.shutdown();
+ //restart
+ startScheduler();
+
+ waitForJobFinish(job.getId(), 10000);
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+ }
++
+ public void testRetryableException() throws Exception {
+ System.setProperty("kylin.job.retry-exception-classes", "java.io.FileNotFoundException");
+ System.setProperty("kylin.job.retry", "3");
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new RetryableTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
++ execMgr.addJob(job);
++ waitForJobFinish(job.getId(), 10000);
++ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
++ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState());
++ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index efbc33e,ab55563..f09c47c
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@@ -55,19 -58,51 +55,21 @@@ import com.google.common.collect.Maps
*/
public class TableMetadataManager {
+ @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(TableMetadataManager.class);
+ public static final Serializer<TableDesc> TABLE_SERIALIZER = new JsonSerializer<TableDesc>(TableDesc.class);
- public static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new JsonSerializer<TableExtDesc>(
++
+ private static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new JsonSerializer<TableExtDesc>(
TableExtDesc.class);
- public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(
- ExternalFilterDesc.class);
-
- // static cached instances
- private static final ConcurrentMap<KylinConfig, TableMetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>();
public static TableMetadataManager getInstance(KylinConfig config) {
- TableMetadataManager r = CACHE.get(config);
- if (r != null) {
- return r;
- }
-
- synchronized (TableMetadataManager.class) {
- r = CACHE.get(config);
- if (r != null) {
- return r;
- }
- try {
- r = new TableMetadataManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist, current keys: {}", StringUtils
- .join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, String>() {
- @Nullable
- @Override
- public String apply(@Nullable KylinConfig input) {
- return String.valueOf(System.identityHashCode(input));
- }
- }), ","));
- }
-
- return r;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to init TableMetadataManager from " + config, e);
- }
- }
+ return config.getManager(TableMetadataManager.class);
}
- public static void clearCache() {
- CACHE.clear();
+ // called by reflection
+ static TableMetadataManager newInstance(KylinConfig config) throws IOException {
+ return new TableMetadataManager(config);
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index fa97596,d8b33c0..7597d40
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@@ -154,20 -146,12 +154,19 @@@ public class FunctionDesc implements Se
}
public DataType getRewriteFieldType() {
-
- if (isMax() || isMin())
- return parameter.getColRefs().get(0).getType();
- else if (getMeasureType() instanceof BasicMeasureType)
- return returnDataType;
- else
+ if (getMeasureType() instanceof BasicMeasureType) {
+ if (isMax() || isMin()) {
+ return parameter.getColRefs().get(0).getType();
+ } else if (isSum()) {
+ return parameter.getColRefs().get(0).getType();
+ } else if (isCount()) {
+ return DataType.getType("bigint");
+ } else {
+ throw new IllegalArgumentException("unknown measure type " + getMeasureType());
+ }
+ } else {
return DataType.ANY;
+ }
}
public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@@ -247,10 -231,18 +246,18 @@@
return expression;
}
++ public void setExpression(String expression) {
++ this.expression = expression;
++ }
++
public ParameterDesc getParameter() {
return parameter;
}
+ public void setParameter(ParameterDesc parameter) {
+ this.parameter = parameter;
+ }
+
- public void setExpression(String expression) {
- this.expression = expression;
- }
-
public int getParameterCount() {
int count = 0;
for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 0029de2,a7d37e7..9b7aaf2
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@@ -50,8 -50,7 +50,33 @@@ import com.google.common.collect.Lists
public class ProjectInstance extends RootPersistentEntity {
public static final String DEFAULT_PROJECT_NAME = "default";
+
++ public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) {
++ ProjectInstance projectInstance = new ProjectInstance();
++
++ projectInstance.updateRandomUuid();
++ projectInstance.setName(name);
++ projectInstance.setOwner(owner);
++ projectInstance.setDescription(description);
++ projectInstance.setStatus(ProjectStatusEnum.ENABLED);
++ projectInstance.setCreateTimeUTC(System.currentTimeMillis());
++ projectInstance.setOverrideKylinProps(overrideProps);
++
++ if (realizationEntries != null)
++ projectInstance.setRealizationEntries(realizationEntries);
++ else
++ projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList());
++ if (models != null)
++ projectInstance.setModels(models);
++ else
++ projectInstance.setModels(new ArrayList<String>());
++ return projectInstance;
++ }
++
++ // ============================================================================
++
+ private KylinConfigExt config;
+
@JsonProperty("name")
private String name;
@@@ -87,50 -86,53 +112,44 @@@
@JsonInclude(JsonInclude.Include.NON_NULL)
private LinkedHashMap<String, String> overrideKylinProps;
- public String getResourcePath() {
- return concatResourcePath(resourceName());
- }
- private KylinConfigExt config;
++ public void init() {
++ if (name == null)
++ name = ProjectInstance.DEFAULT_PROJECT_NAME;
- public static String concatResourcePath(String projectName) {
- return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json";
- public String getResourcePath() {
- return concatResourcePath(name);
-- }
++ if (realizationEntries == null) {
++ realizationEntries = new ArrayList<RealizationEntry>();
++ }
- public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) {
- ProjectInstance projectInstance = new ProjectInstance();
- public static String concatResourcePath(String projectName) {
- return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json";
- }
++ if (tables == null)
++ tables = new TreeSet<String>();
- projectInstance.updateRandomUuid();
- projectInstance.setName(name);
- projectInstance.setOwner(owner);
- projectInstance.setDescription(description);
- projectInstance.setStatus(ProjectStatusEnum.ENABLED);
- projectInstance.setCreateTimeUTC(System.currentTimeMillis());
- projectInstance.setOverrideKylinProps(overrideProps);
- public static String getNormalizedProjectName(String project) {
- if (project == null)
- throw new IllegalStateException("Trying to normalized a project name which is null");
++ if (overrideKylinProps == null) {
++ overrideKylinProps = new LinkedHashMap<>();
++ }
- if (realizationEntries != null)
- projectInstance.setRealizationEntries(realizationEntries);
- else
- projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList());
- if (models != null)
- projectInstance.setModels(models);
- else
- projectInstance.setModels(new ArrayList<String>());
- return projectInstance;
- return project.toUpperCase();
- }
++ initConfig();
+
- public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) {
- ProjectInstance projectInstance = new ProjectInstance();
++ if (StringUtils.isBlank(this.name))
++ throw new IllegalStateException("Project name must not be blank");
+ }
- public void initConfig() {
- projectInstance.updateRandomUuid();
- projectInstance.setName(name);
- projectInstance.setOwner(owner);
- projectInstance.setDescription(description);
- projectInstance.setStatus(ProjectStatusEnum.ENABLED);
- projectInstance.setCreateTimeUTC(System.currentTimeMillis());
- if (overrideProps != null) {
- projectInstance.setOverrideKylinProps(overrideProps);
- } else {
- projectInstance.setOverrideKylinProps(new LinkedHashMap<String, String>());
- }
- if (realizationEntries != null)
- projectInstance.setRealizationEntries(realizationEntries);
- else
- projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList());
- if (models != null)
- projectInstance.setModels(models);
- else
- projectInstance.setModels(new ArrayList<String>());
- return projectInstance;
++ private void initConfig() {
+ this.config = KylinConfigExt.createInstance(KylinConfig.getInstanceFromEnv(), this.overrideKylinProps);
}
-- // ============================================================================
++ public String getResourcePath() {
++ return concatResourcePath(resourceName());
++ }
-- public ProjectInstance() {
++ public static String concatResourcePath(String projectName) {
++ return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json";
}
+ @Override
+ public String resourceName() {
+ return this.name;
+ }
+
public String getDescription() {
return description;
}
@@@ -310,31 -333,6 +329,10 @@@
return config;
}
+ public void setConfig(KylinConfigExt config) {
+ this.config = config;
+ }
+
- public void init() {
- if (name == null)
- name = ProjectInstance.DEFAULT_PROJECT_NAME;
-
- if (realizationEntries == null) {
- realizationEntries = new ArrayList<RealizationEntry>();
- }
-
- if (tables == null)
- tables = new TreeSet<String>();
-
- if (overrideKylinProps == null) {
- overrideKylinProps = new LinkedHashMap<>();
- }
-
- initConfig();
-
- if (StringUtils.isBlank(this.name))
- throw new IllegalStateException("Project name must not be blank");
- }
-
@Override
public String toString() {
return "ProjectDesc [name=" + name + "]";
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --cc core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index b910ffe,024990f..11ad8bb
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@@ -276,9 -255,44 +281,25 @@@ public abstract class GTCubeStorageQuer
}
}
}
-
- // expand derived
- Set<TblColRef> resultD = Sets.newHashSet();
- for (TblColRef col : result) {
- if (cubeDesc.isExtendedColumn(col)) {
- throw new CubeDesc.CannotFilterExtendedColumnException(col);
- }
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- if (hostInfo.isOneToOne) {
- for (TblColRef hostCol : hostInfo.columns) {
- resultD.add(hostCol);
- }
- }
- //if not one2one, it will be pruned
- } else {
- resultD.add(col);
- }
- }
- return resultD;
+ return result;
}
+ private long getQueryFilterMask(Set<TblColRef> filterColumnD) {
+ long filterMask = 0;
+
+ logger.info("Filter column set for query: " + filterColumnD.toString());
+ if (filterColumnD.size() != 0) {
+ RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns();
+ for (int i = 0; i < allColumns.length; i++) {
+ if (filterColumnD.contains(allColumns[i].getColRef())) {
+ filterMask |= 1L << allColumns[i].getBitIndex();
+ }
+ }
+ }
+ logger.info("Filter mask is: " + filterMask);
+ return filterMask;
+ }
+
public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD,
Collection<TblColRef> singleValueD) {
HashSet<TblColRef> temp = Sets.newHashSet();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 8e21f5c,8fbc0c9..faac724
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@@ -18,7 -18,9 +18,10 @@@
package org.apache.kylin.engine.mr;
-import org.apache.kylin.cube.CubeManager;
++import java.util.List;
++
import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@@ -57,7 -62,12 +61,12 @@@ public class BatchCubingJobBuilder2 ext
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+ result.addTask(createFactDistinctColumnsStep(jobId));
+
+ if (isEnableUHCDictStep()) {
+ result.addTask(createBuildUHCDictStep(jobId));
+ }
+
result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));
outputSide.addStepPhase2_BuildDictionary(result);
@@@ -75,8 -85,22 +84,22 @@@
return result;
}
+ private boolean isEnableUHCDictStep() {
+ if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+ return false;
+ }
+
- List<TblColRef> uhcColumns = CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc());
++ List<TblColRef> uhcColumns = seg.getCubeDesc().getAllUHCColumns();
+ if (uhcColumns.size() == 0) {
+ return false;
+ }
+
+ return true;
+ }
+
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
- final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
+ // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+ final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
// base cuboid step
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 6393abf,abf7e02..6f26c35
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@@ -99,9 -142,11 +142,11 @@@ public class CubingJob extends DefaultC
format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
result.setDeployEnvName(kylinConfig.getDeployEnv());
result.setProjectName(projList.get(0).getName());
+ result.setJobType(jobType);
CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setSegmentName(seg.getName(), result.getParams());
- result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - "
+ result.setName(jobType + " CUBE - " + seg.getCubeInstance().getDisplayName() + " - " + seg.getName() + " - "
+ format.format(new Date(System.currentTimeMillis())));
result.setSubmitter(submitter);
result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
@@@ -197,6 -259,44 +259,44 @@@
super.onExecuteFinished(result, executableContext);
}
+ protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+ super.onStatusChange(context, result, state);
+
+ updateMetrics(context, result, state);
+ }
+
+ protected void updateMetrics(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+ JobMetricsFacade.JobStatisticsResult jobStats = new JobMetricsFacade.JobStatisticsResult();
- jobStats.setWrapper(getSubmitter(), ProjectInstance.getNormalizedProjectName(getProjectName()),
++ jobStats.setWrapper(getSubmitter(), getProjectName(),
+ CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+ getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+
+ if (state == ExecutableState.SUCCEED) {
+ jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), getDuration(), getMapReduceWaitTime(),
+ getPerBytesTimeCost(findSourceSizeBytes(), getDuration()));
+ if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) {
+ jobStats.setJobStepStats(
+ getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(),
+ getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(),
+ getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(),
+ getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
+ }
+ } else if (state == ExecutableState.ERROR) {
+ jobStats.setJobException(result.getThrowable() != null ? result.getThrowable() : new Exception());
+ }
+ JobMetricsFacade.updateMetrics(jobStats);
+ }
+
+ private static double getPerBytesTimeCost(long size, long timeCost) {
+ if (size <= 0) {
+ return 0;
+ }
+ if (size < MIN_SOURCE_SIZE) {
+ size = MIN_SOURCE_SIZE;
+ }
+ return timeCost * 1.0 / size;
+ }
+
/**
* build fail because the metadata store has problem.
* @param exception
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 2c0f9f6,ade07e9..8925a8e
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@@ -462,7 -468,9 +471,12 @@@ public abstract class AbstractHadoopJo
}
}
+ public static KylinConfig loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) {
+ HadoopUtil.setCurrentConfiguration(conf.get());
++ return loadKylinConfigFromHdfs(uri);
++ }
+
+ public static KylinConfig loadKylinConfigFromHdfs(String uri) {
if (uri == null)
throw new IllegalArgumentException("meta url should not be null");
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 52b6af5,50c589a..64163ad
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@@ -66,6 -68,9 +68,9 @@@ public interface BatchConstants
String CFG_OUTPUT_PARTITION = "partition";
String CFG_MR_SPARK_JOB = "mr.spark.job";
String CFG_SPARK_META_URL = "spark.meta.url";
+ String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
+
- String CFG_HLL_SHARD_BASE = "mapreduce.partition.hll.shard.base";
++ String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
/**
* command line ARGuments
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index fcd3162,8b9b928..3c054a3
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@@ -40,6 -40,6 +40,7 @@@ import org.apache.hadoop.io.SequenceFil
import org.apache.hadoop.io.SequenceFile.Reader.Option;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
@@@ -80,51 -80,50 +81,54 @@@ public class CubeStatsReader
final CuboidScheduler cuboidScheduler;
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
+ this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+ }
+
+ /**
+ * @param cuboidScheduler if it's null, part of it's functions will not be supported
+ */
+ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig)
+ throws IOException {
ResourceStore store = ResourceStore.getStore(kylinConfig);
- cuboidScheduler = cubeSegment.getCuboidScheduler();
String statsKey = cubeSegment.getStatisticsResourcePath();
-- File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream);
- Reader reader = null;
-
- try {
- Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
-
- Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
- Option seqInput = SequenceFile.Reader.file(path);
- reader = new SequenceFile.Reader(hadoopConf, seqInput);
-
- int percentage = 100;
- int mapperNumber = 0;
- double mapperOverlapRatio = 0;
- Map<Long, HLLCounter> counterMap = Maps.newHashMap();
-
- LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
- while (reader.next(key, value)) {
- if (key.get() == 0L) {
- percentage = Bytes.toInt(value.getBytes());
- } else if (key.get() == -1) {
- mapperOverlapRatio = Bytes.toDouble(value.getBytes());
- } else if (key.get() == -2) {
- mapperNumber = Bytes.toInt(value.getBytes());
- } else if (key.get() > 0) {
- HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
- counterMap.put(key.get(), hll);
- }
- }
-
- this.seg = cubeSegment;
- this.samplingPercentage = percentage;
- this.mapperNumberOfFirstBuild = mapperNumber;
- this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
- this.cuboidRowEstimatesHLL = counterMap;
++ RawResource resource = store.getResource(statsKey);
++ if (resource == null)
++ throw new IllegalStateException("Missing resource at " + statsKey);
++
++ File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
+ Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
+
+ CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision());
+ tmpSeqFile.delete();
+
+ this.seg = cubeSegment;
+ this.cuboidScheduler = cuboidScheduler;
+ this.samplingPercentage = cubeStatsResult.getPercentage();
+ this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
+ this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
+ this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+ }
- } finally {
- IOUtils.closeStream(reader);
- tmpSeqFile.delete();
- }
+ /**
+ * Read statistics from
+ * @param path
+ * rather than
+ * @param cubeSegment
+ *
+ * Since the statistics are from
+ * @param path
+ * cuboid scheduler should be provided by default
+ */
+ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path)
+ throws IOException {
+ CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision());
+
+ this.seg = cubeSegment;
+ this.cuboidScheduler = cuboidScheduler;
+ this.samplingPercentage = cubeStatsResult.getPercentage();
+ this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
+ this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
+ this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
}
private File writeTmpSeqFile(InputStream inputStream) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
index 0000000,1809ff0..0e56287
mode 000000,100644..100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@@ -1,0 -1,54 +1,60 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.engine.mr.common;
+
+ import java.io.IOException;
+ import java.util.Comparator;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.Cuboid;
+ import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+ import org.apache.kylin.cube.cuboid.CuboidScheduler;
+ import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
+
+ import com.google.common.collect.Lists;
+
+ public class CuboidSchedulerUtil {
+
+ public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) {
- return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName));
++ if (cuboidModeName == null)
++ return segment.getCuboidScheduler();
++ else
++ return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName));
+ }
+
+ public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) {
- return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
++ if (cuboidMode == CuboidModeEnum.CURRENT || cuboidMode == null)
++ return segment.getCuboidScheduler();
++ else
++ return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+ }
+
+ public static CuboidScheduler getCuboidScheduler(CubeSegment segment, Set<Long> cuboidSet) {
+ try {
+ Map<Long, Long> cuboidsWithRowCnt = CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment);
+ Comparator<Long> comparator = cuboidsWithRowCnt == null ? Cuboid.cuboidSelectComparator
+ : new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt);
+ return new TreeCuboidScheduler(segment.getCubeDesc(), Lists.newArrayList(cuboidSet), comparator);
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index ad13425,a230517..ad8b954
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@@ -59,23 -67,13 +70,17 @@@ public class JobInfoConverter
return null;
}
+ CubingJob cubeJob = (CubingJob) job;
-
+ CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
+ .getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+
- Output output = outputs.get(job.getId());
final JobInstance result = new JobInstance();
result.setName(job.getName());
- if (cube != null) {
- result.setRelatedCube(cube.getDisplayName());
- } else {
- result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
- }
- result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
- result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
++ result.setRelatedCube(cube != null ? cube.getDisplayName() : CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+ result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
result.setLastModified(output.getLastModified());
- result.setSubmitter(cubeJob.getSubmitter());
- result.setUuid(cubeJob.getId());
+ result.setSubmitter(job.getSubmitter());
+ result.setUuid(job.getId());
result.setType(CubeBuildTypeEnum.BUILD);
result.setStatus(parseToJobStatus(output.getState()));
result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 0000000,b249f12..8fc26b4
mode 000000,100644..100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@@ -1,0 -1,131 +1,132 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.engine.mr.common;
+
+ import java.io.IOException;
+ import java.util.Map;
+
+ import org.apache.hadoop.mapreduce.Reducer;
+ import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.CuboidScheduler;
+ import org.apache.kylin.cube.model.CubeDesc;
+ import org.apache.kylin.job.exception.JobException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class MapReduceUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+ /**
+ * @return reducer number for calculating hll
+ */
- public static int getHLLShardBase(CubeSegment segment) {
- int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size();
- int shardBase = (nCuboids - 1) / segment.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
++ public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
++ int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
++ int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+
- int hllMaxReducerNumber = segment.getConfig().getHadoopJobHLLMaxReducerNumber();
++ int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+ if (shardBase > hllMaxReducerNumber) {
+ shardBase = hllMaxReducerNumber;
+ }
+ return shardBase;
+ }
+
+ /**
+ * @param cuboidScheduler specified can provide more flexibility
+ * */
+ public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+ double totalMapInputMB, int level)
+ throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ KylinConfig kylinConfig = cubeDesc.getConfig();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+ + level);
+
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+ double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+ if (level == -1) {
+ //merge case
+ double estimatedSize = cubeStatsReader.estimateCubeSize();
+ adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+ logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+ totalMapInputMB, adjustedCurrentLayerSizeEst);
+ } else if (level == 0) {
+ //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+ logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else {
+ parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+ currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+ adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+ logger.debug(
+ "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+ totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ }
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ return numReduceTasks;
+ }
+
+ public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+ throws IOException {
+ KylinConfig kylinConfig = cubeSeg.getConfig();
+
+ Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ logger.info("Having total map input MB " + Math.round(totalSizeInM));
+ logger.info("Having per reduce MB " + perReduceInputMB);
+ logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+ return numReduceTasks;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 0f7281f,4efcb96..09db8e9
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@@ -107,8 -101,30 +101,30 @@@ public class StatisticsDecisionUtil
return;
}
+ CubeInstance cube = segment.getCubeInstance();
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setCuboids(recommendCuboidsWithStats);
- CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder);
+ CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
+ update.setCuboids(recommendCuboidsWithStats);
+ CubeManager.getInstance(cube.getConfig()).updateCube(update);
}
+
+ public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) {
+ CubeInstance cube = segment.getCubeInstance();
+ if (!cube.getConfig().isCubePlannerEnabled())
+ return false;
+
+ if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) {
+ logger.info("Has read pending segments and will not enable cube planner.");
+ return false;
+ }
+ List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+ List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
+ if (newSegments.size() <= 1 && //
+ (readySegments.size() == 0 || //
+ (cube.getConfig().isCubePlannerEnabledForExistingCube() && readySegments.size() == 1
+ && readySegments.get(0).getSegRange().equals(segment.getSegRange())))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index 0000000,8f64272..f3bdabd
mode 000000,100644..100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@@ -1,0 -1,120 +1,120 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.engine.mr.steps;
+
+ import java.io.IOException;
+
+ import org.apache.commons.cli.Options;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Job;
+ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeManager;
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.apache.kylin.engine.mr.common.MapReduceUtil;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+ options.addOption(OPTION_CUBOID_MODE);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+ String cuboidMode = getOptionValue(OPTION_CUBOID_MODE);
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidMode);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ setupMapper(input);
+ setupReducer(output, cubeSegment);
+
+ attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ logger.error("error in CalculateStatsFromBaseCuboidJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+ private void setupMapper(Path input) throws IOException {
+ FileInputFormat.setInputPaths(job, input);
+ job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
- int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg);
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase);
++ int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
++ job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
+
+ job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(hllShardBase);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
index 0000000,70db21b..8b84844
mode 000000,100644..100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
@@@ -1,0 -1,59 +1,59 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.engine.mr.steps;
+
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Partitioner;
+ import org.apache.kylin.common.util.Bytes;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ */
+ public class CalculateStatsFromBaseCuboidPartitioner extends Partitioner<Text, Text> implements Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class);
+
+ private Configuration conf;
+ private int hllShardBase = 1;
+
+ @Override
+ public int getPartition(Text key, Text value, int numReduceTasks) {
+ Long cuboidId = Bytes.toLong(key.getBytes());
+ int shard = cuboidId.hashCode() % hllShardBase;
+ if (shard < 0) {
+ shard += hllShardBase;
+ }
+ return numReduceTasks - shard - 1;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
- hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1);
++ hllShardBase = conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1);
+ logger.info("shard base for hll is " + hllShardBase);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 98ebbb4,d64d300..a457677
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@@ -72,7 -77,16 +77,16 @@@ public class CreateDictionaryJob extend
@Override
public Dictionary<String> getDictionary(TblColRef col) throws IOException {
- Path colDir = new Path(factColumnsInputPath, col.getIdentity());
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
- List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
++ List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns();
+
+ Path colDir;
+ if (uhcColumns.contains(col)) {
+ colDir = new Path(dictPath, col.getIdentity());
+ } else {
+ colDir = new Path(factColumnsInputPath, col.getIdentity());
+ }
FileSystem fs = HadoopUtil.getWorkingFileSystem();
Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index a1bba6e,e06077a..b48b19b
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@@ -38,7 -38,7 +38,8 @@@ import com.google.common.collect.Lists
public class CubingExecutableUtil {
public static final String CUBE_NAME = "cubeName";
+ public static final String DISPALY_NAME = "displayName";
+ public static final String SEGMENT_NAME = "segmentName";
public static final String SEGMENT_ID = "segmentId";
public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
public static final String STATISTICS_PATH = "statisticsPath";
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 5fcfe42,141ca99..9bede82
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@@ -18,25 -18,52 +18,62 @@@
package org.apache.kylin.engine.mr.steps;
++import java.io.IOException;
++
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
++import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
++import org.apache.kylin.cube.CubeInstance;
++import org.apache.kylin.cube.CubeManager;
++import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
/**
*/
- public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> {
+ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> implements Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnPartitioner.class);
+
+ private Configuration conf;
- private int hllShardBase = 1;
++ private FactDistinctColumnsReducerMapping reducerMapping;
++
++ @Override
++ public void setConf(Configuration conf) {
++ this.conf = conf;
++
++ KylinConfig config;
++ try {
++ config = AbstractHadoopJob.loadKylinPropsAndMetadata();
++ } catch (IOException e) {
++ throw new RuntimeException(e);
++ }
++ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
++ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
++
++ reducerMapping = new FactDistinctColumnsReducerMapping(cube,
++ conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
++ }
@Override
public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
Text key = skey.getText();
-- if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) {
- // the last reducer is for merging hll
- return numReduceTasks - 1;
- } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
- // the last but one reducer is for partition col
- return numReduceTasks - 2;
- // the last $hllShard reducers are for merging hll
++ if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+ Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
- int shard = cuboidId.hashCode() % hllShardBase;
- if (shard < 0) {
- shard += hllShardBase;
- }
- return numReduceTasks - shard - 1;
- } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
- // the last but one reducer is for partition col
- return numReduceTasks - hllShardBase - 1;
++ return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
++ } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
++ return reducerMapping.getReducerIdForDatePartitionColumn();
} else {
return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
}
}
+
+ @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1);
- logger.info("shard base for hll is " + hllShardBase);
- }
-
- @Override
+ public Configuration getConf() {
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index ac8ce26,5200950..cc4f260
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@@ -19,7 -19,7 +19,6 @@@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
--import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
@@@ -42,7 -42,8 +41,6 @@@ import org.apache.kylin.engine.mr.IMRIn
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceUtil;
--import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -79,21 -82,21 +77,6 @@@ public class FactDistinctColumnsJob ext
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
-- Set<TblColRef> columnsNeedDict = cube.getDescriptor().getAllColumnsNeedDictionaryBuilt();
--
-- int reducerCount = columnsNeedDict.size();
-- int uhcReducerCount = cube.getConfig().getUHCReducerCount();
--
-- int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-- for (int index : uhcIndex) {
-- if (index == 1) {
-- reducerCount += uhcReducerCount - 1;
-- }
-- }
--
-- if (reducerCount > 255) {
-- throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
-- }
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
@@@ -114,7 -118,7 +97,7 @@@
}
setupMapper(segment);
- setupReducer(output, reducerCount + 2);
- setupReducer(output, segment, statistics_enabled, reducerCount);
++ setupReducer(output, segment);
attachCubeMetadata(cube, job.getConfiguration());
@@@ -143,22 -147,29 +126,32 @@@
job.setMapOutputValueClass(Text.class);
}
- private void setupReducer(Path output, int numberOfReducers) throws IOException {
- private void setupReducer(Path output, CubeSegment cubeSeg, String statistics_enabled, int reducerCount)
++ private void setupReducer(Path output, CubeSegment cubeSeg)
+ throws IOException {
- int numberOfReducers = reducerCount;
- if ("true".equalsIgnoreCase(statistics_enabled)) {
- int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg);
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase);
- numberOfReducers += (1 + hllShardBase);
++ FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
++ int numberOfReducers = reducerMapping.getTotalReducerNum();
++ if (numberOfReducers > 250) {
++ throw new IllegalArgumentException(
++ "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
++ + numberOfReducers
++ + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
+ }
++
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
++ job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
-- //make each reducer output to respective dir
++ // make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
--
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
-- //prevent to create zero-sized default output
++ // prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);