You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/22 10:26:27 UTC

[1/2] incubator-kylin git commit: minor

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-942 e9067ef86 -> 9d256bd37


minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e6f2e060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e6f2e060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e6f2e060

Branch: refs/heads/KYLIN-942
Commit: e6f2e060eab882460cefcd860b862e0f58ac1866
Parents: e9067ef
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 15:46:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 15:46:11 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/gridtable/GTRecord.java    |  5 ----
 .../kylin/query/test/ITKylinQueryTest.java      |  2 +-
 query/src/test/resources/query/sql/query01.sql  |  4 ++--
 query/src/test/resources/query/sql/query86.sql  | 24 ++++++++++++++++++++
 .../coprocessor/endpoint/CubeVisitService.java  |  1 -
 5 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e6f2e060/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index d54e223..5312308 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -264,11 +264,6 @@ public class GTRecord implements Comparable<GTRecord> {
     }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadPrimaryKey(ByteBuffer buf) {
-        loadColumns(info.primaryKey, buf);
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void loadCellBlock(int c, ByteBuffer buf) {
         loadColumns(info.colBlocks[c], buf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e6f2e060/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index a594585..a23f4ae 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query78.sql";
+        String queryFileName = "src/test/resources/query/sql/query86.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e6f2e060/query/src/test/resources/query/sql/query01.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query01.sql b/query/src/test/resources/query/sql/query01.sql
index 5a53058..9ed1db3 100644
--- a/query/src/test/resources/query/sql/query01.sql
+++ b/query/src/test/resources/query/sql/query01.sql
@@ -16,5 +16,5 @@
 -- limitations under the License.
 --
 
-select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact 
- group by LSTG_FORMAT_NAME 
+select LSTG_FORMAT_NAME,slr_segment_cd ,sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact 
+ group by LSTG_FORMAT_NAME ,slr_segment_cd

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e6f2e060/query/src/test/resources/query/sql/query86.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query86.sql b/query/src/test/resources/query/sql/query86.sql
new file mode 100644
index 0000000..f6feaaf
--- /dev/null
+++ b/query/src/test/resources/query/sql/query86.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_kylin_fact.cal_dt, count(*) as mmm from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id where lstg_format_name = 'Others'  group by test_kylin_fact.cal_dt order by test_kylin_fact.cal_dt 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e6f2e060/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 2b7c20c..c753911 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -133,7 +133,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 hbaseColumnsToGT.add(intList.getIntsList());
             }
 
-            //TODO: rewrite own start/end
             Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
 
             region = env.getRegion();


[2/2] incubator-kylin git commit: print stat and avoid loadallcolumns

Posted by ma...@apache.org.
print stat and avoid loadallcolumns


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9d256bd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9d256bd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9d256bd3

Branch: refs/heads/KYLIN-942
Commit: 9d256bd370d6d2b7a0e5b553fbd3a5faebc5d419
Parents: e6f2e06
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 16:30:38 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 16:30:38 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/CompressionUtils.java     |  6 ++--
 .../org/apache/kylin/gridtable/GTRecord.java    | 34 ------------------
 .../apache/kylin/query/test/KylinTestBase.java  |  2 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 37 ++++++++++++++++----
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  5 ++-
 .../coprocessor/endpoint/CubeVisitService.java  | 12 ++++---
 6 files changed, 43 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 13abab5..c9838e4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -45,8 +45,7 @@ public class CompressionUtils {
         outputStream.close();
         byte[] output = outputStream.toByteArray();
 
-        logger.info("Original: " + data.length + " bytes");
-        logger.info("Compressed: " + output.length + " bytes");
+        logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes ");
         return output;
     }
 
@@ -63,8 +62,7 @@ public class CompressionUtils {
         outputStream.close();
         byte[] output = outputStream.toByteArray();
 
-        logger.info("Original: " + data.length + " bytes");
-        logger.info("Decompressed: " + output.length + " bytes");
+        logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes");
         return output;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 5312308..0f4eb3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -228,22 +228,6 @@ public class GTRecord implements Comparable<GTRecord> {
         buf.setLength(pos);
     }
 
-    /**
-     * write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not.
-     * for saving space
-     */
-    public void exportAllColumns(ByteBuffer buf) {
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
-            if (cols[c] == null || cols[c].array() == null) {
-                buf.put((byte) 0);
-            } else {
-                buf.put((byte) 1);
-                buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
-            }
-        }
-    }
-
     /** write data to given buffer, like serialize */
     public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -269,24 +253,6 @@ public class GTRecord implements Comparable<GTRecord> {
     }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadAllColumns(ByteBuffer buf) {
-        int pos = buf.position();
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
-
-            byte exist = buf.get();
-            pos++;
-
-            if (exist == 1) {
-                int len = info.codeSystem.codeLength(c, buf);
-                cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
-                pos += len;
-                buf.position(pos);
-            }
-        }
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
         int pos = buf.position();
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
index b94542c..0399f8c 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
@@ -208,7 +208,7 @@ public class KylinTestBase {
         if (needSort) {
             queryTable = new SortedTable(queryTable, columnNames);
         }
-        printResult(queryTable);
+        //printResult(queryTable);
 
         return queryTable;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 5a91537..bb2a18a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
 import org.apache.kylin.storage.hbase.steps.HBaseConnection;
 
 import com.google.common.base.Function;
@@ -61,10 +62,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     static class EndpointResultsAsGTScanner implements IGTScanner {
         private GTInfo info;
         private Iterator<byte[]> blocks;
+        private ImmutableBitSet columns;
 
-        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) {
+        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns) {
             this.info = info;
             this.blocks = blocks;
+            this.columns = columns;
         }
 
         @Override
@@ -89,7 +92,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public Iterator<GTRecord> apply(@Nullable final byte[] input) {
 
-                    logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length);
                     return new Iterator<GTRecord>() {
                         private ByteBuffer inputBuffer = null;
                         private GTRecord oneRecord = null;
@@ -106,7 +108,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                         @Override
                         public GTRecord next() {
-                            oneRecord.loadAllColumns(inputBuffer);
+                            oneRecord.loadColumns(columns, inputBuffer);
                             return oneRecord;
                         }
 
@@ -151,7 +153,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
         }
 
-        for (final RawScan rawScan : rawScans) {
+        for (int i = 0; i < rawScans.size(); ++i) {
+            final int shardIndex = i;
+            final RawScan rawScan = rawScans.get(i);
+
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
@@ -169,6 +174,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                         throw new RuntimeException("Error when visiting cubes by endpoint:", throwable);
                     }
 
+                    //results.size() supposed to be 1;
+                    if (results.size() != 1) {
+                        logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex);
+                    }
+
+                    for (CubeVisitProtos.CubeVisitResponse result : results) {
+                        logger.info(getStatsString(result, shardIndex));
+                    }
+
                     Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
                         @Nullable
                         @Override
@@ -193,7 +207,18 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             throw new RuntimeException("Visiting cube by endpoint gets interrupted");
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator());
+        return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns());
+    }
+
+    private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) {
+        StringBuilder sb = new StringBuilder();
+        Stats stats = result.getStats();
+        sb.append("Shard " + shardIndex + ": ");
+        sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
+        sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". ");
+        sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). ");
+        return sb.toString();
+
     }
 
     //TODO : async callback ?
@@ -211,8 +236,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
         });
 
-        logger.info("{} regions returned results ", results.values().size());
-
         return results.values();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index d0551bb..7731f19 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -100,16 +100,15 @@ public class HBaseReadonlyStore implements IGTStore {
 
                     @Override
                     public GTRecord next() {
+                        count++;
                         List<Cell> oneRow = cellListIterator.next();
                         if (oneRow.size() < 1) {
                             throw new IllegalStateException("cell list's size less than 1");
                         }
 
-                        ByteBuffer buf;
-
                         // dimensions, set to primary key, also the 0th column block
                         Cell firstCell = oneRow.get(0);
-                        buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
+                        ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
                         oneRecord.loadCellBlock(0, buf);
 
                         // metrics

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index c753911..5e14474 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -146,12 +146,16 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner);
 
             ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
+            int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
                 buffer.clear();
-                oneRecord.exportAllColumns(buffer);
+                oneRecord.exportColumns(scanReq.getColumns(), buffer);
                 buffer.flip();
+
                 outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
+                finalRowCount++;
             }
             //outputStream.close() is not necessary
             byte[] allRows = outputStream.toByteArray();
@@ -159,8 +163,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             done.run(responseBuilder.//
                     setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies 
                     setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
-                            setAggregatedRowCount(0).//
-                            setScannedRowCount(0).//
+                            setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
+                            setScannedRowCount(finalScanner.getScannedRowCount()).//
                             setServiceStartTime(serviceStartTime).//
                             setServiceEndTime(System.currentTimeMillis()).build()).//
                     build());