You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/09 11:08:56 UTC

[1/2] kylin git commit: KYLIN-2030 bug fix

Repository: kylin
Updated Branches:
  refs/heads/orderedbytes adf1369af -> afdec89fe


KYLIN-2030 bug fix


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b0aa327d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b0aa327d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b0aa327d

Branch: refs/heads/orderedbytes
Commit: b0aa327d23d635fada35b80bb149a2611ad689b5
Parents: adf1369
Author: Hongbin Ma <ma...@apache.org>
Authored: Sun Oct 9 19:07:52 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Oct 9 19:07:52 2016 +0800

----------------------------------------------------------------------
 .../kylin/cube/CubeCapabilityChecker.java       | 23 ++++++++++----------
 .../kylin/query/relnode/OLAPAggregateRel.java   | 22 +++++++++++--------
 2 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index ee21b1c..e509d98 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -81,7 +81,7 @@ public class CubeCapabilityChecker {
             //1. dimension as measure
 
             if (!unmatchedAggregations.isEmpty()) {
-                tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, cube.getDescriptor().listDimensionColumnsIncludingDerived());
+                tryDimensionAsMeasures(unmatchedAggregations, result, cube.getDescriptor().listDimensionColumnsIncludingDerived());
             }
         } else {
             //for non query-on-facttable 
@@ -92,10 +92,18 @@ public class CubeCapabilityChecker {
                     dimCols.add(columnDesc.getRef());
                 }
 
-                //1. dimension as measure, like max(cal_dt) or count( distinct col) from lookup
+                //1. all aggregations on lookup table can be done. For distinct count, mark them all DimensionAsMeasures
+                // so that the measure has a chance to be upgraded to DimCountDistinctMeasureType in org.apache.kylin.metadata.model.FunctionDesc#reInitMeasureType
                 if (!unmatchedAggregations.isEmpty()) {
-                    tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, dimCols);
+                    Iterator<FunctionDesc> itr = unmatchedAggregations.iterator();
+                    while (itr.hasNext()) {
+                        FunctionDesc functionDesc = itr.next();
+                        if (dimCols.containsAll(functionDesc.getParameter().getColRefs())) {
+                            itr.remove();
+                        }
+                    }
                 }
+                tryDimensionAsMeasures(Lists.newArrayList(aggrFunctions), result, dimCols);
 
                 //2. more "dimensions" contributed by snapshot
                 if (!unmatchedDimensions.isEmpty()) {
@@ -159,19 +167,12 @@ public class CubeCapabilityChecker {
         return result;
     }
 
-    private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result, Set<TblColRef> dimCols) {
-        CubeDesc cubeDesc = cube.getDescriptor();
-        Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
+    private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, CapabilityResult result, Set<TblColRef> dimCols) {
 
         Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
         while (it.hasNext()) {
             FunctionDesc functionDesc = it.next();
 
-            if (cubeFuncs.contains(functionDesc)) {
-                it.remove();
-                continue;
-            }
-
             // let calcite handle count
             if (functionDesc.isCount()) {
                 it.remove();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 97efb27..c7a1eff 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -285,16 +285,20 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
     }
 
     private void translateAggregation() {
-        // now the realization is known, replace aggregations with what's defined on MeasureDesc
-        List<MeasureDesc> measures = this.context.realization.getMeasures();
-        List<FunctionDesc> newAggrs = Lists.newArrayList();
-        for (FunctionDesc aggFunc : this.aggregations) {
-            newAggrs.add(findInMeasures(aggFunc, measures));
+        if (!noPrecaculatedFieldsAvailable()) {
+            // now the realization is known, replace aggregations with what's defined on MeasureDesc
+            List<MeasureDesc> measures = this.context.realization.getMeasures();
+            List<FunctionDesc> newAggrs = Lists.newArrayList();
+            for (FunctionDesc aggFunc : this.aggregations) {
+                newAggrs.add(findInMeasures(aggFunc, measures));
+            }
+            this.aggregations.clear();
+            this.aggregations.addAll(newAggrs);
+            this.context.aggregations.clear();
+            this.context.aggregations.addAll(newAggrs);
+        } else {
+            //the realization is not contributing pre-calculated fields at all
         }
-        this.aggregations.clear();
-        this.aggregations.addAll(newAggrs);
-        this.context.aggregations.clear();
-        this.context.aggregations.addAll(newAggrs);
     }
 
     private FunctionDesc findInMeasures(FunctionDesc aggFunc, List<MeasureDesc> measures) {


[2/2] kylin git commit: KYLIN-1726 fix 'FileSystem Closed' error

Posted by ma...@apache.org.
KYLIN-1726 fix 'FileSystem Closed' error


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/afdec89f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/afdec89f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/afdec89f

Branch: refs/heads/orderedbytes
Commit: afdec89fe09dcb28b368775f8b830c78f74e7489
Parents: b0aa327
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 19:06:07 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Oct 9 19:08:41 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/kafka/UpdateTimeRangeStep.java   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/afdec89f/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
index bb64bf9..9e902d8 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
         final Path outputFile = new Path(outputPath, partitionCol.getName());
 
         String minValue = null, maxValue = null, currentValue = null;
-        try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+        FSDataInputStream inputStream = null;
+        BufferedReader bufferedReader = null;
+        try {
+            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+            inputStream = fs.open(outputFile);
+            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
             minValue = currentValue = bufferedReader.readLine();
             while (currentValue != null) {
                 maxValue = currentValue;
@@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
         } catch (IOException e) {
             logger.error("fail to read file " + outputFile, e);
             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        } finally {
+            IOUtils.closeQuietly(bufferedReader);
+            IOUtils.closeQuietly(inputStream);
         }
 
         final DataType partitionColType = partitionCol.getType();