You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/09 11:08:56 UTC
[1/2] kylin git commit: KYLIN-2030 bug fix
Repository: kylin
Updated Branches:
refs/heads/orderedbytes adf1369af -> afdec89fe
KYLIN-2030 bug fix
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b0aa327d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b0aa327d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b0aa327d
Branch: refs/heads/orderedbytes
Commit: b0aa327d23d635fada35b80bb149a2611ad689b5
Parents: adf1369
Author: Hongbin Ma <ma...@apache.org>
Authored: Sun Oct 9 19:07:52 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Oct 9 19:07:52 2016 +0800
----------------------------------------------------------------------
.../kylin/cube/CubeCapabilityChecker.java | 23 ++++++++++----------
.../kylin/query/relnode/OLAPAggregateRel.java | 22 +++++++++++--------
2 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index ee21b1c..e509d98 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -81,7 +81,7 @@ public class CubeCapabilityChecker {
//1. dimension as measure
if (!unmatchedAggregations.isEmpty()) {
- tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, cube.getDescriptor().listDimensionColumnsIncludingDerived());
+ tryDimensionAsMeasures(unmatchedAggregations, result, cube.getDescriptor().listDimensionColumnsIncludingDerived());
}
} else {
//for non query-on-facttable
@@ -92,10 +92,18 @@ public class CubeCapabilityChecker {
dimCols.add(columnDesc.getRef());
}
- //1. dimension as measure, like max(cal_dt) or count( distinct col) from lookup
+ //1. all aggregations on lookup table can be done. For distinct count, mark them all DimensionAsMeasures
+ // so that the measure has a chance to be upgraded to DimCountDistinctMeasureType in org.apache.kylin.metadata.model.FunctionDesc#reInitMeasureType
if (!unmatchedAggregations.isEmpty()) {
- tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, dimCols);
+ Iterator<FunctionDesc> itr = unmatchedAggregations.iterator();
+ while (itr.hasNext()) {
+ FunctionDesc functionDesc = itr.next();
+ if (dimCols.containsAll(functionDesc.getParameter().getColRefs())) {
+ itr.remove();
+ }
+ }
}
+ tryDimensionAsMeasures(Lists.newArrayList(aggrFunctions), result, dimCols);
//2. more "dimensions" contributed by snapshot
if (!unmatchedDimensions.isEmpty()) {
@@ -159,19 +167,12 @@ public class CubeCapabilityChecker {
return result;
}
- private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result, Set<TblColRef> dimCols) {
- CubeDesc cubeDesc = cube.getDescriptor();
- Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
+ private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, CapabilityResult result, Set<TblColRef> dimCols) {
Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
while (it.hasNext()) {
FunctionDesc functionDesc = it.next();
- if (cubeFuncs.contains(functionDesc)) {
- it.remove();
- continue;
- }
-
// let calcite handle count
if (functionDesc.isCount()) {
it.remove();
http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 97efb27..c7a1eff 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -285,16 +285,20 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
private void translateAggregation() {
- // now the realization is known, replace aggregations with what's defined on MeasureDesc
- List<MeasureDesc> measures = this.context.realization.getMeasures();
- List<FunctionDesc> newAggrs = Lists.newArrayList();
- for (FunctionDesc aggFunc : this.aggregations) {
- newAggrs.add(findInMeasures(aggFunc, measures));
+ if (!noPrecaculatedFieldsAvailable()) {
+ // now the realization is known, replace aggregations with what's defined on MeasureDesc
+ List<MeasureDesc> measures = this.context.realization.getMeasures();
+ List<FunctionDesc> newAggrs = Lists.newArrayList();
+ for (FunctionDesc aggFunc : this.aggregations) {
+ newAggrs.add(findInMeasures(aggFunc, measures));
+ }
+ this.aggregations.clear();
+ this.aggregations.addAll(newAggrs);
+ this.context.aggregations.clear();
+ this.context.aggregations.addAll(newAggrs);
+ } else {
+ //the realization is not contributing pre-calculated fields at all
}
- this.aggregations.clear();
- this.aggregations.addAll(newAggrs);
- this.context.aggregations.clear();
- this.context.aggregations.addAll(newAggrs);
}
private FunctionDesc findInMeasures(FunctionDesc aggFunc, List<MeasureDesc> measures) {
[2/2] kylin git commit: KYLIN-1726 fix 'FileSystem Closed' error
Posted by ma...@apache.org.
KYLIN-1726 fix 'FileSystem Closed' error
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/afdec89f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/afdec89f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/afdec89f
Branch: refs/heads/orderedbytes
Commit: afdec89fe09dcb28b368775f8b830c78f74e7489
Parents: b0aa327
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 19:06:07 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Oct 9 19:08:41 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/UpdateTimeRangeStep.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/afdec89f/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
index bb64bf9..9e902d8 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
final Path outputFile = new Path(outputPath, partitionCol.getName());
String minValue = null, maxValue = null, currentValue = null;
- try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ FSDataInputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+ inputStream = fs.open(outputFile);
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
minValue = currentValue = bufferedReader.readLine();
while (currentValue != null) {
maxValue = currentValue;
@@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
} catch (IOException e) {
logger.error("fail to read file " + outputFile, e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } finally {
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(inputStream);
}
final DataType partitionColType = partitionCol.getType();