You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2017/02/12 01:35:07 UTC
[2/2] kylin git commit: KYLIN-2443 Report coprocessor error
information back to client
KYLIN-2443 Report coprocessor error information back to client
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/43c05667
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/43c05667
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/43c05667
Branch: refs/heads/master
Commit: 43c0566728092d537201d751d3e8f6e3c0d5f051
Parents: 707b055
Author: gaodayue <ga...@meituan.com>
Authored: Sat Feb 11 19:13:24 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Sun Feb 12 09:34:49 2017 +0800
----------------------------------------------------------------------
build/smoke-test/sql/sql1.json | 1 -
build/smoke-test/testQuery.py | 1 +
.../org/apache/kylin/common/QueryContext.java | 21 +-
.../exceptions/KylinTimeoutException.java | 26 +
.../ResourceLimitExceededException.java | 30 +
.../kylin/gridtable/GTAggregateScanner.java | 12 +-
.../GTScanExceedThresholdException.java | 26 -
.../GTScanSelfTerminatedException.java | 30 -
.../kylin/gridtable/GTScanTimeoutException.java | 26 -
.../apache/kylin/storage/StorageContext.java | 19 +-
.../storage/gtrecord/CubeSegmentScanner.java | 2 +-
.../kylin/storage/gtrecord/ScannerWorker.java | 5 +-
.../gtrecord/SequentialCubeTupleIterator.java | 6 +-
.../apache/kylin/query/ITKylinQueryTest.java | 6 +-
.../apache/kylin/rest/service/QueryService.java | 28 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 69 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 8 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 15 +-
.../coprocessor/endpoint/CubeVisitService.java | 42 +-
.../endpoint/generated/CubeVisitProtos.java | 1254 ++++++++++++++++--
.../endpoint/protobuf/CubeVisit.proto | 12 +-
21 files changed, 1331 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/build/smoke-test/sql/sql1.json
----------------------------------------------------------------------
diff --git a/build/smoke-test/sql/sql1.json b/build/smoke-test/sql/sql1.json
index 7cb3258..21e4c01 100644
--- a/build/smoke-test/sql/sql1.json
+++ b/build/smoke-test/sql/sql1.json
@@ -9,7 +9,6 @@
]
],
"exceptionMessage": null,
- "totalScanCount": 1,
"columnMetas": [
{
"scale": 0,
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/build/smoke-test/testQuery.py
----------------------------------------------------------------------
diff --git a/build/smoke-test/testQuery.py b/build/smoke-test/testQuery.py
index 87a2456..99c09d3 100644
--- a/build/smoke-test/testQuery.py
+++ b/build/smoke-test/testQuery.py
@@ -59,6 +59,7 @@ class testQuery(unittest.TestCase):
del actual_result['duration']
del actual_result['hitExceptionCache']
del actual_result['storageCacheUsed']
+ del actual_result['totalScanCount']
del actual_result['totalScanBytes']
expect_result = json.loads(open(sql_file[:-4] + '.json').read().strip())
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 3a73993..67925b6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -21,7 +21,7 @@ package org.apache.kylin.common;
import java.util.concurrent.atomic.AtomicLong;
/**
- * checkout {@link org.apache.kylin.common.debug.BackdoorToggles} for comparison
+ * Holds per query information and statistics.
*/
public class QueryContext {
@@ -33,7 +33,8 @@ public class QueryContext {
};
private String queryId;
- private AtomicLong scanBytes = new AtomicLong();
+ private AtomicLong scannedRows = new AtomicLong();
+ private AtomicLong scannedBytes = new AtomicLong();
private QueryContext() {
// use QueryContext.current() instead
@@ -55,11 +56,19 @@ public class QueryContext {
this.queryId = queryId;
}
- public long getScanBytes() {
- return scanBytes.get();
+ public long getScannedRows() {
+ return scannedRows.get();
}
- public long addAndGetScanBytes(long delta) {
- return scanBytes.addAndGet(delta);
+ public long addAndGetScannedRows(long deltaRows) {
+ return scannedRows.addAndGet(deltaRows);
+ }
+
+ public long getScannedBytes() {
+ return scannedBytes.get();
+ }
+
+ public long addAndGetScannedBytes(long deltaBytes) {
+ return scannedBytes.addAndGet(deltaBytes);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java b/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java
new file mode 100644
index 0000000..75d981f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.exceptions;
+
+public class KylinTimeoutException extends RuntimeException {
+
+ public KylinTimeoutException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java b/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java
new file mode 100644
index 0000000..df5d88e
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.exceptions;
+
+public class ResourceLimitExceededException extends RuntimeException {
+
+ public ResourceLimitExceededException(String message) {
+ super(message);
+ }
+
+ public ResourceLimitExceededException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index dd359f8..8b0efcc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -35,6 +35,7 @@ import java.util.SortedMap;
import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -278,14 +279,11 @@ public class GTAggregateScanner implements IGTScanner {
final long estMemSize = estimatedMemSize();
if (spillThreshold > 0 && estMemSize > spillThreshold) {
- // spill to disk when aggBufMap used too large memory
- if (spillEnabled) {
- spillBuffMap(estMemSize);
- aggBufMap = createBuffMap();
-
- } else {
- throw new GTScanSelfTerminatedException("Aggregation using more than " + spillThreshold + " memory and spill is disabled");
+ if (!spillEnabled) {
+ throw new ResourceLimitExceededException("aggregation's memory consumption " + estMemSize + " exceeds threshold " + spillThreshold);
}
+ spillBuffMap(estMemSize); // spill to disk
+ aggBufMap = createBuffMap();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
deleted file mode 100644
index ba75962..0000000
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.gridtable;
-
-public class GTScanExceedThresholdException extends GTScanSelfTerminatedException {
-
- public GTScanExceedThresholdException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
deleted file mode 100644
index 30d3aaa..0000000
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.gridtable;
-
-/**
- * Implementations of {@link IGTScanner} should throw {@link GTScanSelfTerminatedException} or its subclasses
- * in cases where the scan runs out of resources (time, memory, etc) and can not be continued.
- */
-public class GTScanSelfTerminatedException extends RuntimeException {
-
- public GTScanSelfTerminatedException(String s) {
- super(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
deleted file mode 100644
index 17a8d02..0000000
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.gridtable;
-
-public class GTScanTimeoutException extends GTScanSelfTerminatedException {
-
- public GTScanTimeoutException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 0f52c53..4713d71 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -47,8 +47,7 @@ public class StorageContext {
private boolean enableCoprocessor = false;
private IStorageQuery storageQuery;
- private AtomicLong totalScanCount = new AtomicLong();
- private AtomicLong totalScanBytes = new AtomicLong();
+ private AtomicLong processedRowCount = new AtomicLong();
private Cuboid cuboid;
private boolean partialResultReturned = false;
@@ -140,20 +139,12 @@ public class StorageContext {
return cuboid;
}
- public long getTotalScanCount() {
- return totalScanCount.get();
+ public long getProcessedRowCount() {
+ return processedRowCount.get();
}
- public long increaseTotalScanCount(long count) {
- return this.totalScanCount.addAndGet(count);
- }
-
- public long getTotalScanBytes() {
- return totalScanBytes.get();
- }
-
- public long increaseTotalScanBytes(long bytes) {
- return totalScanBytes.addAndGet(bytes);
+ public long increaseProcessedRowCount(long count) {
+ return processedRowCount.addAndGet(count);
}
public boolean isAcceptPartialResult() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 974b8ea..029502c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
- scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index 2a2a86a..fd50c54 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -26,7 +26,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStorage;
import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +38,7 @@ public class ScannerWorker {
private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
private IGTScanner internal = null;
- public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
+ public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
if (scanRequest == null) {
logger.info("Segment {} will be skipped", segment);
internal = new EmptyGTScanner(0);
@@ -49,7 +48,7 @@ public class ScannerWorker {
final GTInfo info = scanRequest.getInfo();
try {
- IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
+ IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
internal = rpc.getGTScanner(scanRequest);
} catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index bb2d7f9..14b6394 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -26,8 +26,8 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -139,7 +139,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
@Override
public ITuple next() {
if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) {
- throw new GTScanTimeoutException("Query Timeout!");
+ throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds");
}
if (++scanCountDelta >= 1000)
@@ -173,7 +173,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
}
private void flushScanCountDelta() {
- context.increaseTotalScanCount(scanCountDelta);
+ context.increaseProcessedRowCount(scanCountDelta);
scanCountDelta = 0;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 87ddcb8..4590e60 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.routing.Candidate;
@@ -124,12 +124,12 @@ public class ITKylinQueryTest extends KylinTestBase {
System.out.println(e.getMessage());
- if (findRoot(e) instanceof GTScanSelfTerminatedException) {
+ if (findRoot(e) instanceof KylinTimeoutException) {
//expected
continue;
}
}
- throw new RuntimeException("Expecting GTScanTimeoutException");
+ throw new RuntimeException("Expecting KylinTimeoutException");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 7d9e24d..4c02aa4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -57,13 +57,13 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.DBUtils;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTScanExceedThresholdException;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.relnode.OLAPContext;
@@ -328,10 +328,12 @@ public class QueryService extends BasicService {
throw new InternalErrorException("Project cannot be empty. Please select a project.");
}
- final String queryId = UUID.randomUUID().toString();
if (sqlRequest.getBackdoorToggles() != null)
BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
- QueryContext.current().setQueryId(queryId);
+
+ final QueryContext queryContext = QueryContext.current();
+ final String queryId = UUID.randomUUID().toString();
+ queryContext.setQueryId(queryId);
try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
String sql = sqlRequest.getSql();
@@ -372,6 +374,8 @@ public class QueryService extends BasicService {
} else {
sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+ sqlResponse.setTotalScanCount(0);
+ sqlResponse.setTotalScanBytes(0);
}
checkQueryAuth(sqlResponse);
@@ -381,9 +385,10 @@ public class QueryService extends BasicService {
String errMsg = QueryUtil.makeErrorMsgUserFriendly(e);
sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
+ sqlResponse.setTotalScanCount(queryContext.getScannedRows());
+ sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
- // for exception queries, only cache ScanOutOfLimitException
- if (queryCacheEnabled && e instanceof GTScanExceedThresholdException) {
+ if (queryCacheEnabled && e.getCause() != null && e.getCause() instanceof ResourceLimitExceededException) {
Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
exceptionCache.put(new Element(sqlRequest, sqlResponse));
}
@@ -582,26 +587,21 @@ public class QueryService extends BasicService {
boolean isPartialResult = false;
String cube = "";
- StringBuilder sb = new StringBuilder("Scan stats for each storageContext: ");
- long totalScanCount = 0;
- long totalScanBytes = 0;
+ StringBuilder sb = new StringBuilder("Processed rows for each storageContext: ");
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
if (ctx.realization != null) {
isPartialResult |= ctx.storageContext.isPartialResultReturned();
cube = ctx.realization.getName();
- totalScanCount += ctx.storageContext.getTotalScanCount();
- totalScanBytes += ctx.storageContext.getTotalScanBytes();
- sb.append("{rows=").append(ctx.storageContext.getTotalScanCount()).
- append(" bytes=").append(ctx.storageContext.getTotalScanBytes()).append("} ");
+ sb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
}
}
}
logger.info(sb.toString());
SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult);
- response.setTotalScanCount(totalScanCount);
- response.setTotalScanBytes(totalScanBytes);
+ response.setTotalScanCount(QueryContext.current().getScannedRows());
+ response.setTotalScanBytes(QueryContext.current().getScannedBytes());
return response;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 68a84c1..3c01da2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.apache.hadoop.hbase.TableName;
@@ -33,7 +34,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
@@ -43,12 +45,9 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTScanExceedThresholdException;
import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -71,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
- public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
- super(segment, cuboid, fullGTInfo, context);
+ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+ super(segment, cuboid, fullGTInfo);
}
private byte[] getByteArrayForShort(short v) {
@@ -107,8 +106,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@SuppressWarnings("checkstyle:methodlength")
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
- final QueryContext queryContext = QueryContext.current();
-
Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
short shardNum = shardNumAndBaseShard.getFirst();
short cuboidBaseShard = shardNumAndBaseShard.getSecond();
@@ -175,7 +172,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
public void run() {
final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest)));
- final boolean[] abnormalFinish = new boolean[1];
+ final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>();
try {
Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
@@ -199,22 +196,32 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}, new Batch.Callback<CubeVisitResponse>() {
@Override
public void update(byte[] region, byte[] row, CubeVisitResponse result) {
- if (region == null)
+ if (region == null) {
return;
+ }
- final long scanBytes = result.getStats().getScannedBytes();
- context.increaseTotalScanBytes(scanBytes);
- totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
logger.info(logHeader + getStatsString(region, result));
- if (queryContext.addAndGetScanBytes(scanBytes) > cubeSeg.getConfig().getQueryMaxScanBytes()) {
- throw new GTScanExceedThresholdException("Query scanned " + queryContext.getScanBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
+ Stats stats = result.getStats();
+ queryContext.addAndGetScannedRows(stats.getScannedRowCount());
+ queryContext.addAndGetScannedBytes(stats.getScannedBytes());
+ totalScannedCount.addAndGet(stats.getScannedRowCount());
+
+ // if any other region has responded with error, skip further processing
+ if (regionErrorHolder.get() != null) {
+ return;
}
+ // record coprocessor error if happened
if (result.getStats().getNormalComplete() != 1) {
- abnormalFinish[0] = true;
+ regionErrorHolder.compareAndSet(null, getCoprocessorException(result));
return;
}
+
+ if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+ throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
+ }
+
try {
if (compressionResult) {
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
@@ -233,11 +240,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return;
}
- if (abnormalFinish[0]) {
- Throwable ex = new GTScanSelfTerminatedException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
- logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
- epResultItr.notifyCoprocException(ex);
- return;
+ if (regionErrorHolder.get() != null) {
+ RuntimeException exception = regionErrorHolder.get();
+ logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout
+ epResultItr.notifyCoprocException(exception);
}
}
});
@@ -288,6 +294,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private String getStatsString(byte[] region, CubeVisitResponse result) {
StringBuilder sb = new StringBuilder();
Stats stats = result.getStats();
+ byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
+
sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
@@ -296,8 +304,27 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
+ sb.append("Compressed row size: ").append(compressedRows.length);
return sb.toString();
}
+ private RuntimeException getCoprocessorException(CubeVisitResponse response) {
+ if (!response.hasErrorInfo()) {
+ return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
+ }
+
+ CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo();
+
+ switch (errorInfo.getType()) {
+ case UNKNOWN_TYPE:
+ return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage());
+ case TIMEOUT:
+ return new KylinTimeoutException(errorInfo.getMessage());
+ case RESOURCE_LIMIT_EXCEEDED:
+ return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage());
+ default:
+ throw new AssertionError("Unknown error type: " + errorInfo.getType());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 11fbb19..f24290c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -48,7 +49,6 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.IGTStorage;
-import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,18 +63,18 @@ public abstract class CubeHBaseRPC implements IGTStorage {
final protected CubeSegment cubeSeg;
final protected Cuboid cuboid;
final protected GTInfo fullGTInfo;
- final protected StorageContext context;
+ final protected QueryContext queryContext;
final private RowKeyEncoder fuzzyKeyEncoder;
final private RowKeyEncoder fuzzyMaskEncoder;
- public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+ public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
- this.context = context;
+ this.queryContext = QueryContext.current();
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index b94346c..1698180 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -42,7 +42,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,8 +87,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
}
- public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
- super(segment, cuboid, fullGTInfo, context);
+ public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
+ super(segment, cuboid, fullGTInfo);
}
@Override
@@ -182,15 +181,18 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
CellListIterator cellListIterator = new CellListIterator() {
- long scanBytes = 0;
+ long scannedRows = 0;
+ long scannedBytes = 0;
@Override
public void close() throws IOException {
+ queryContext.addAndGetScannedRows(scannedRows);
+ queryContext.addAndGetScannedBytes(scannedBytes);
+
for (ResultScanner scanner : scanners) {
scanner.close();
}
hbaseTable.close();
- context.increaseTotalScanBytes(scanBytes);
}
@Override
@@ -202,8 +204,9 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
public List<Cell> next() {
List<Cell> result = allResultsIterator.next().listCells();
for (Cell cell : result) {
- scanBytes += CellUtil.estimatedSizeOf(cell);
+ scannedBytes += CellUtil.estimatedSizeOf(cell);
}
+ scannedRows++;
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5fd9740..e18ff0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -31,7 +31,6 @@ import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
@@ -44,16 +43,15 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanExceedThresholdException;
import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
-import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -165,13 +163,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
if (rowCount > rowCountLimit) {
- throw new GTScanExceedThresholdException("Number of rows scanned exceeds threshold " + rowCountLimit);
+ throw new ResourceLimitExceededException("scanned row count exceeds threshold " + rowCountLimit);
}
if (rowBytes > bytesLimit) {
- throw new GTScanExceedThresholdException("Scanned " + rowBytes + " bytes exceeds threshold " + bytesLimit);
+ throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit);
}
if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) {
- throw new GTScanTimeoutException("Scan timeout after " + timeout + " ms");
+ throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms");
}
return delegate.hasNext();
}
@@ -232,6 +230,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
byte[] allRows;
String debugGitTag = "";
+ CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null;
+
String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId";
try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
this.serviceStartTime = System.currentTimeMillis();
@@ -292,7 +292,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
scanReq.disableAggCacheMemCheck(); // disable mem check if so told
}
- final MutableBoolean scanNormalComplete = new MutableBoolean(true);
final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(
@@ -332,11 +331,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
break;
}
}
- } catch (GTScanSelfTerminatedException e) {
- // the query is using too much resource, we mark it as abnormal finish instead of
- // throwing RuntimeException to avoid client retrying RPC.
- scanNormalComplete.setValue(false);
- logger.warn("Abort scan: {}", e.getMessage());
+ } catch (KylinTimeoutException e) {
+ logger.info("Abort scan: {}", e.getMessage());
+ errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
+ .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT)
+ .setMessage(e.getMessage())
+ .build();
+ } catch (ResourceLimitExceededException e) {
+ logger.info("Abort scan: {}", e.getMessage());
+ errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
+ .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED)
+ .setMessage(e.getMessage())
+ .build();
} finally {
finalScanner.close();
}
@@ -347,7 +353,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
//outputStream.close() is not necessary
byte[] compressedAllRows;
- if (scanNormalComplete.booleanValue()) {
+ if (errorInfo == null) {
allRows = outputStream.toByteArray();
} else {
allRows = new byte[0];
@@ -370,6 +376,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
sb.append(" debugGitTag:" + debugGitTag);
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
+ if (errorInfo != null) {
+ responseBuilder.setErrorInfo(errorInfo);
+ }
done.run(responseBuilder.//
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().
@@ -383,9 +392,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
setFreeSwapSpaceSize(freeSwapSpaceSize).
setHostname(InetAddress.getLocalHost().getHostName()).
setEtcMsg(sb.toString()).
- setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
- .//
- build());
+ setNormalComplete(errorInfo == null ? 1 : 0).build())
+ .build());
} catch (IOException ioe) {
logger.error(ioe.toString(), ioe);