You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/14 18:59:02 UTC
[phoenix] branch master updated: PHOENIX-5131 Make spilling to disk
for order/group by configurable
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 05b9901 PHOENIX-5131 Make spilling to disk for order/group by configurable
05b9901 is described below
commit 05b99018a77805cd92411c26bd4147e62cbf7281
Author: Abhishek Singh Chouhan <ab...@gmail.com>
AuthorDate: Wed Mar 13 17:34:37 2019 -0700
PHOENIX-5131 Make spilling to disk for order/group by configurable
Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
.../java/org/apache/phoenix/end2end/OrderByIT.java | 45 +++++++
...OrderByWithServerClientSpoolingDisabledIT.java} | 17 ++-
.../end2end/OrderByWithServerMemoryLimitIT.java | 81 ++++++++++++
.../phoenix/end2end/OrderByWithSpillingIT.java | 3 +-
.../phoenix/end2end/SpooledTmpFileDeleteIT.java | 2 +-
.../end2end/join/SortMergeJoinNoSpoolingIT.java | 83 +++++++++++++
.../phoenix/coprocessor/MetaDataProtocol.java | 7 ++
.../phoenix/coprocessor/ScanRegionObserver.java | 4 +-
.../org/apache/phoenix/execute/AggregatePlan.java | 28 ++++-
.../phoenix/execute/ClientAggregatePlan.java | 30 ++++-
.../org/apache/phoenix/execute/ClientScanPlan.java | 16 ++-
.../java/org/apache/phoenix/execute/ScanPlan.java | 10 +-
.../apache/phoenix/execute/SortMergeJoinPlan.java | 138 +++++----------------
.../phoenix/hbase/index/util/VersionUtil.java | 12 ++
.../org/apache/phoenix/iterate/BufferedQueue.java | 20 +--
.../phoenix/iterate/BufferedSortedQueue.java | 33 +----
.../apache/phoenix/iterate/BufferedTupleQueue.java | 134 ++++++++++++++++++++
.../iterate/NonAggregateRegionScannerFactory.java | 45 +++++--
.../iterate/OrderedAggregatingResultIterator.java | 5 +-
.../phoenix/iterate/OrderedResultIterator.java | 72 +++++++++--
.../org/apache/phoenix/iterate/PhoenixQueues.java | 96 ++++++++++++++
.../apache/phoenix/iterate/SizeAwareQueue.java} | 19 +--
.../org/apache/phoenix/iterate/SizeBoundQueue.java | 96 ++++++++++++++
.../phoenix/iterate/SpoolingResultIterator.java | 5 +-
.../org/apache/phoenix/query/QueryServices.java | 11 +-
.../apache/phoenix/query/QueryServicesOptions.java | 19 ++-
.../phoenix/iterate/OrderedResultIteratorTest.java | 55 +++++++-
.../phoenix/query/QueryServicesTestImpl.java | 3 +-
.../org/apache/phoenix/util/MetaDataUtilTest.java | 10 +-
29 files changed, 880 insertions(+), 219 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
index 792d08f..172ed89 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.containsString;
import java.sql.Connection;
import java.sql.Date;
@@ -30,6 +33,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
@@ -461,4 +466,44 @@ public class OrderByIT extends BaseOrderByIT {
conn.close();
}
}
+
+ @Test
+ public void testOrderByWithClientMemoryLimit() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ Boolean.toString(Boolean.FALSE));
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName = generateUniqueName();
+ String ddl =
+ "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer"
+ + " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+
+ String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 20);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.execute();
+ conn.commit();
+
+ String query = "select count(*), col1 from " + tableName + " group by col1 order by 2";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ try {
+ rs.next();
+ fail("Expected PhoenixIOException due to IllegalStateException");
+ } catch (PhoenixIOException e) {
+ assertThat(e.getMessage(), containsString("java.lang.IllegalStateException: "
+ + "Queue full. Consider increasing memory threshold or spooling to disk"));
+ }
+ }
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
similarity index 66%
copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
index c5eeaff..caf515f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
@@ -19,18 +19,29 @@ package org.apache.phoenix.end2end;
import java.util.Map;
+import org.apache.phoenix.iterate.SizeBoundQueue;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import com.google.common.collect.Maps;
-public class OrderByWithSpillingIT extends OrderByIT {
+/**
+ * Same as the order by test but with spooling disabled both on the server and client. This will use
+ * {@link SizeBoundQueue} for all its operations
+ */
+public class OrderByWithServerClientSpoolingDisabledIT extends OrderByIT {
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
- // do lot's of spooling!
- props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ // make sure disabling server side spooling has no affect on correctness(existing orderby
+ // IT)
+ props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ Boolean.toString(Boolean.FALSE));
+ props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ Boolean.toString(Boolean.FALSE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java
new file mode 100644
index 0000000..2c66614
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class OrderByWithServerMemoryLimitIT extends BaseTest {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ Boolean.toString(Boolean.FALSE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testOrderByWithServerMemoryLimit() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName = generateUniqueName();
+ String ddl =
+ "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer"
+ + " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+
+ String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 20);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.execute();
+ conn.commit();
+
+ String query = "select * from " + tableName + " order by 2";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ try {
+ rs.next();
+ fail("Expected PhoenixIOException due to IllegalStateException");
+ } catch (PhoenixIOException e) {
+ assertThat(e.getMessage(), containsString("java.lang.IllegalStateException: "
+ + "Queue full. Consider increasing memory threshold or spooling to disk"));
+ }
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
index c5eeaff..80a5123 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
@@ -30,7 +30,8 @@ public class OrderByWithSpillingIT extends OrderByIT {
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
// do lot's of spooling!
- props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
index 9dc82bf..e63c3f6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
@@ -44,7 +44,7 @@ public class SpooledTmpFileDeleteIT extends ParallelStatsDisabledIT {
private Connection getConnection() throws Exception {
Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
props.setProperty(QueryServices.SPOOL_DIRECTORY, spoolDir.getPath());
- props.setProperty(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ props.setProperty(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
props.setProperty(QueryServices.RENEW_LEASE_ENABLED, Boolean.toString(false));
// Ensures round robin off so that spooling is used.
// TODO: review with Samarth - should a Noop iterator be used if pacing is not possible?
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java
new file mode 100644
index 0000000..40b1fe6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end.join;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+public class SortMergeJoinNoSpoolingIT extends SortMergeJoinNoIndexIT {
+
+ public SortMergeJoinNoSpoolingIT(String[] indexDDL, String[] plans) {
+ super(indexDDL, plans);
+ }
+
+ @Parameters(name = "SortMergeJoinNoSpoolingIT_{index}") // name is used by failsafe as file name
+ // in reports
+ public static Collection<Object> data() {
+ return SortMergeJoinNoIndexIT.data();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB,
+ Boolean.toString(Boolean.FALSE));
+ props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ Integer.toString(10 * 1000 * 1000));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testJoinWithMemoryLimit() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+ String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+ String query =
+ "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM "
+ + tableName1 + " item JOIN " + tableName2
+ + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ try {
+ rs.next();
+ fail("Expected PhoenixIOException due to IllegalStateException");
+ } catch (PhoenixIOException e) {
+ assertThat(e.getMessage(), containsString(
+ "Queue full. Consider increasing memory threshold or spooling to disk"));
+ }
+
+ }
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 7a1d542..458ebe8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -118,6 +118,13 @@ public abstract class MetaDataProtocol extends MetaDataService {
// Version at which we allow SYSTEM.CATALOG to split
public static final int MIN_SPLITTABLE_SYSTEM_CATALOG = VersionUtil.encodeVersion("5", "1", "0");
+ // Version at and after which we will no longer expect client to serialize thresholdBytes for
+ // spooling into the scan
+ public static final int MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD =
+ VersionUtil.encodeVersion("5", "1", "0");
+ public static final int MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD =
+ VersionUtil.encodeVersion("4", "15", "0");
+
// ALWAYS update this map whenever rolling out a new release (major, minor or patch release).
// Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.
private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = new TreeMap<>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ae2a6fd..08fa321 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -74,11 +74,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver implements Reg
return Optional.of(this);
}
- public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+ public static void serializeIntoScan(Scan scan, int limit,
+ List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
try {
DataOutputStream output = new DataOutputStream(stream);
- WritableUtils.writeVInt(output, thresholdBytes);
WritableUtils.writeVInt(output, limit);
WritableUtils.writeVInt(output, estimatedRowSize);
WritableUtils.writeVInt(output, orderByExpressions.size());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 0c8e8dc..d7c3048 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -193,8 +193,16 @@ public class AggregatePlan extends BaseQueryPlan {
isAscending=false;
}
OrderByExpression orderByExpression = new OrderByExpression(expression, isNullsLast, isAscending);
- int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
- return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
+ long threshold =
+ services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ boolean spoolingEnabled =
+ services.getProps().getBoolean(
+ QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+ return new OrderedResultIterator(scanner,
+ Collections.<OrderByExpression> singletonList(orderByExpression),
+ spoolingEnabled, threshold);
}
}
@@ -306,10 +314,18 @@ public class AggregatePlan extends BaseQueryPlan {
resultScanner = new LimitingResultIterator(resultScanner, limit);
}
} else {
- int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
- QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
- resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(),
- thresholdBytes, limit, offset);
+ long thresholdBytes =
+ context.getConnection().getQueryServices().getProps().getLong(
+ QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ boolean spoolingEnabled =
+ context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+ resultScanner =
+ new OrderedAggregatingResultIterator(aggResultIterator,
+ orderBy.getOrderByExpressions(), spoolingEnabled, thresholdBytes, limit,
+ offset);
}
if (context.getSequenceManager().getSequenceCount() > 0) {
resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 60451a5..92bebf1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -145,8 +145,14 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
if (groupBy.isOrderPreserving()) {
aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
} else {
- int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
- (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ long thresholdBytes =
+ context.getConnection().getQueryServices().getProps().getLong(
+ QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ boolean spoolingEnabled =
+ context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
for (Expression keyExpression : keyExpressions) {
keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
@@ -156,7 +162,10 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
// Pass in orderBy to apply any sort that has been optimized away
aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, orderBy);
} else {
- iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
+ iterator =
+ new OrderedResultIterator(iterator, keyExpressionOrderBy,
+ spoolingEnabled, thresholdBytes, null, null,
+ projector.getEstimatedRowByteSize());
aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
}
}
@@ -180,9 +189,18 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
resultScanner = new LimitingResultIterator(resultScanner, limit);
}
} else {
- int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
- QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
- resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit, offset);
+ long thresholdBytes =
+ context.getConnection().getQueryServices().getProps().getLong(
+ QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ boolean spoolingEnabled =
+ context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+ resultScanner =
+ new OrderedAggregatingResultIterator(aggResultIterator,
+ orderBy.getOrderByExpressions(), spoolingEnabled, thresholdBytes, limit,
+ offset);
}
if (context.getSequenceManager().getSequenceCount() > 0) {
resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 3427f5f..4a54a41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -86,10 +86,18 @@ public class ClientScanPlan extends ClientProcessingPlan {
}
if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
- int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
- QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
- iterator = new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(), thresholdBytes, limit,
- offset, projector.getEstimatedRowByteSize());
+ long thresholdBytes =
+ context.getConnection().getQueryServices().getProps().getLong(
+ QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ boolean spoolingEnabled =
+ context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+ iterator =
+ new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(),
+ spoolingEnabled, thresholdBytes, limit, offset,
+ projector.getEstimatedRowByteSize());
} else {
if (offset != null) {
iterator = new OffsetResultIterator(iterator, offset);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index cdb2da5..0ad0d1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.execute.visitor.ByteCountVisitor;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
@@ -110,11 +111,10 @@ public class ScanPlan extends BaseQueryPlan {
this.allowPageFilter = allowPageFilter;
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
if (isOrdered) { // TopN
- int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
- QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
- ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes,
- limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset), orderBy.getOrderByExpressions(),
- projector.getEstimatedRowByteSize());
+ ScanRegionObserver.serializeIntoScan(context.getScan(),
+ limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset),
+ orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
+ ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
}
Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e7966d9..c2686ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
@@ -53,7 +54,9 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.BufferedQueue;
import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.PhoenixQueues;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SizeAwareQueue;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.optimize.Cost;
@@ -94,7 +97,8 @@ public class SortMergeJoinPlan implements QueryPlan {
private final int rhsFieldPosition;
private final boolean isSingleValueOnly;
private final Set<TableRef> tableRefs;
- private final int thresholdBytes;
+ private final long thresholdBytes;
+ private final boolean spoolingEnabled;
private Long estimatedBytes;
private Long estimatedRows;
private Long estimateInfoTs;
@@ -120,8 +124,14 @@ public class SortMergeJoinPlan implements QueryPlan {
this.tableRefs = Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + rhsPlan.getSourceRefs().size());
this.tableRefs.addAll(lhsPlan.getSourceRefs());
this.tableRefs.addAll(rhsPlan.getSourceRefs());
- this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
- QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ this.thresholdBytes =
+ context.getConnection().getQueryServices().getProps().getLong(
+ QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+ this.spoolingEnabled =
+ context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED);
}
@Override
@@ -294,7 +304,7 @@ public class SortMergeJoinPlan implements QueryPlan {
private ValueBitSet lhsBitSet;
private ValueBitSet rhsBitSet;
private byte[] emptyProjectedValue;
- private BufferedTupleQueue queue;
+ private SizeAwareQueue<Tuple> queue;
private Iterator<Tuple> queueIterator;
public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
@@ -316,14 +326,23 @@ public class SortMergeJoinPlan implements QueryPlan {
int len = lhsBitSet.getEstimatedLength();
this.emptyProjectedValue = new byte[len];
lhsBitSet.toBytes(emptyProjectedValue, 0);
- this.queue = new BufferedTupleQueue(thresholdBytes);
+ this.queue = PhoenixQueues.newTupleQueue(spoolingEnabled, thresholdBytes);
this.queueIterator = null;
}
@Override
public void close() throws SQLException {
SQLException e = closeIterators(lhsIterator, rhsIterator);
- queue.close();
+ try {
+ queue.close();
+ } catch (IOException t) {
+ if (e != null) {
+ e.setNextException(
+ new SQLException("Also encountered exception while closing queue", t));
+ } else {
+ e = new SQLException("Error while closing queue",t);
+ }
+ }
if (e != null) {
throw e;
}
@@ -355,7 +374,11 @@ public class SortMergeJoinPlan implements QueryPlan {
if (lhsKey.equals(rhsKey)) {
next = join(lhsTuple, rhsTuple);
if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) {
- queue.offer(rhsTuple);
+ try {
+ queue.add(rhsTuple);
+ } catch (IllegalStateException e) {
+ throw new PhoenixIOException(e);
+ }
if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) {
queueIterator = queue.iterator();
advance(true);
@@ -609,107 +632,6 @@ public class SortMergeJoinPlan implements QueryPlan {
}
}
- private static class BufferedTupleQueue extends BufferedQueue<Tuple> {
-
- public BufferedTupleQueue(int thresholdBytes) {
- super(thresholdBytes);
- }
-
- @Override
- protected BufferedSegmentQueue<Tuple> createSegmentQueue(
- int index, int thresholdBytes) {
- return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
- }
-
- @Override
- protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() {
- return new Comparator<BufferedSegmentQueue<Tuple>>() {
- @Override
- public int compare(BufferedSegmentQueue<Tuple> q1,
- BufferedSegmentQueue<Tuple> q2) {
- return q1.index() - q2.index();
- }
- };
- }
-
- @Override
- public Iterator<Tuple> iterator() {
- return new Iterator<Tuple>() {
- private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
- private Iterator<Tuple> currentIter;
- {
- this.queueIter = getSegmentQueues().iterator();
- this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
- }
-
- @Override
- public boolean hasNext() {
- return currentIter != null && currentIter.hasNext();
- }
-
- @Override
- public Tuple next() {
- if (!hasNext())
- return null;
-
- Tuple ret = currentIter.next();
- if (!currentIter.hasNext()) {
- this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
- }
-
- return ret;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- };
- }
-
- private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> {
- private LinkedList<Tuple> results;
-
- public BufferedTupleSegmentQueue(int index,
- int thresholdBytes, boolean hasMaxQueueSize) {
- super(index, thresholdBytes, hasMaxQueueSize);
- this.results = Lists.newLinkedList();
- }
-
- @Override
- protected Queue<Tuple> getInMemoryQueue() {
- return results;
- }
-
- @Override
- protected int sizeOf(Tuple e) {
- KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
- return Bytes.SIZEOF_INT * 2 + kv.getLength();
- }
-
- @Override
- protected void writeToStream(DataOutputStream out, Tuple e) throws IOException {
- KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
- out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
- out.writeInt(kv.getLength());
- out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
- }
-
- @Override
- protected Tuple readFromStream(DataInputStream in) throws IOException {
- int length = in.readInt();
- if (length < 0)
- return null;
-
- byte[] b = new byte[length];
- in.readFully(b);
- Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
- return new ResultTuple(result);
- }
-
- }
- }
@Override
public boolean useRoundRobinIterator() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
index 42d07f5..fd02ab5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
@@ -73,4 +73,16 @@ public class VersionUtil {
version |= (major << Byte.SIZE * 2);
return version;
}
+
+ public static int decodeMajorVersion(int encodedVersion) {
+ return (encodedVersion >> Byte.SIZE * 2);
+ }
+
+ public static int decodeMinorVersion(int encodedVersion) {
+ return (encodedVersion >> Byte.SIZE) & 0xFF;
+ }
+
+ public static int decodePatchVersion(int encodedVersion) {
+ return encodedVersion & 0xFF;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
index 6f6c523..1a646e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
@@ -36,14 +36,14 @@ import java.util.UUID;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
-public abstract class BufferedQueue<T> extends AbstractQueue<T> {
- private final int thresholdBytes;
+public abstract class BufferedQueue<T> extends AbstractQueue<T> implements SizeAwareQueue<T> {
+ private final long thresholdBytes;
private List<BufferedSegmentQueue<T>> queues;
private int currentIndex;
private BufferedSegmentQueue<T> currentQueue;
private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue;
- public BufferedQueue(int thresholdBytes) {
+ public BufferedQueue(long thresholdBytes) {
this.thresholdBytes = thresholdBytes;
this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList();
this.currentIndex = -1;
@@ -51,7 +51,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
this.mergedQueue = null;
}
- abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
+ abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, long thresholdBytes);
abstract protected Comparator<BufferedSegmentQueue<T>> getSegmentQueueComparator();
@@ -122,10 +122,12 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
return size;
}
+ @Override
public long getByteSize() {
return currentQueue == null ? 0 : currentQueue.getInMemByteSize();
}
+ @Override
public void close() {
for (BufferedSegmentQueue<T> queue : queues) {
queue.close();
@@ -150,10 +152,10 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
protected static final int EOF = -1;
private final int index;
- private final int thresholdBytes;
+ private final long thresholdBytes;
private final boolean hasMaxQueueSize;
private long totalResultSize = 0;
- private int maxResultSize = 0;
+ private long maxResultSize = 0;
private File file;
private boolean isClosed = false;
private boolean flushBuffer = false;
@@ -163,7 +165,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
// iterators to close on close()
private List<SegmentQueueFileIterator> iterators;
- public BufferedSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
+ public BufferedSegmentQueue(int index, long thresholdBytes, boolean hasMaxQueueSize) {
this.index = index;
this.thresholdBytes = thresholdBytes;
this.hasMaxQueueSize = hasMaxQueueSize;
@@ -171,7 +173,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
}
abstract protected Queue<T> getInMemoryQueue();
- abstract protected int sizeOf(T e);
+ abstract protected long sizeOf(T e);
abstract protected void writeToStream(DataOutputStream out, T e) throws IOException;
abstract protected T readFromStream(DataInputStream in) throws IOException;
@@ -296,7 +298,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> {
private void flush(T entry) throws IOException {
Queue<T> inMemQueue = getInMemoryQueue();
- int resultSize = sizeOf(entry);
+ long resultSize = sizeOf(entry);
maxResultSize = Math.max(maxResultSize, resultSize);
totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
if (totalResultSize >= thresholdBytes) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
index 700991f..80e20d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
@@ -42,7 +42,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
private final int limit;
public BufferedSortedQueue(Comparator<ResultEntry> comparator,
- Integer limit, int thresholdBytes) throws IOException {
+ Integer limit, long thresholdBytes) throws IOException {
super(thresholdBytes);
this.comparator = comparator;
this.limit = limit == null ? -1 : limit;
@@ -50,7 +50,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
@Override
protected BufferedSegmentQueue<ResultEntry> createSegmentQueue(
- int index, int thresholdBytes) {
+ int index, long thresholdBytes) {
return new BufferedResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
}
@@ -68,7 +68,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
private MinMaxPriorityQueue<ResultEntry> results = null;
public BufferedResultEntryPriorityQueue(int index,
- int thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
+ long thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
super(index, thresholdBytes, limit >= 0);
this.results = limit < 0 ?
MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
@@ -81,8 +81,8 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
}
@Override
- protected int sizeOf(ResultEntry e) {
- return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
+ protected long sizeOf(ResultEntry e) {
+ return ResultEntry.sizeOf(e);
}
@Override
@@ -147,28 +147,5 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
return kvs;
}
- private int sizeof(List<KeyValue> kvs) {
- int size = Bytes.SIZEOF_INT; // totalLen
-
- for (KeyValue kv : kvs) {
- size += kv.getLength();
- size += Bytes.SIZEOF_INT; // kv.getLength
- }
-
- return size;
- }
-
- private int sizeof(ImmutableBytesWritable[] sortKeys) {
- int size = Bytes.SIZEOF_INT;
- if (sortKeys != null) {
- for (ImmutableBytesWritable sortKey : sortKeys) {
- if (sortKey != null) {
- size += sortKey.getLength();
- }
- size += Bytes.SIZEOF_INT;
- }
- }
- return size;
- }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java
new file mode 100644
index 0000000..7297a78
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ResultUtil;
+
+import com.google.common.collect.Lists;
+
+public class BufferedTupleQueue extends BufferedQueue<Tuple> {
+
+ public BufferedTupleQueue(long thresholdBytes) {
+ super(thresholdBytes);
+ }
+
+ @Override
+ protected BufferedSegmentQueue<Tuple> createSegmentQueue(int index, long thresholdBytes) {
+ return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
+ }
+
+ @Override
+ protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() {
+ return new Comparator<BufferedSegmentQueue<Tuple>>() {
+ @Override
+ public int compare(BufferedSegmentQueue<Tuple> q1, BufferedSegmentQueue<Tuple> q2) {
+ return q1.index() - q2.index();
+ }
+ };
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+ private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
+ private Iterator<Tuple> currentIter;
+ {
+ this.queueIter = getSegmentQueues().iterator();
+ this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIter != null && currentIter.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ if (!hasNext()) return null;
+
+ Tuple ret = currentIter.next();
+ if (!currentIter.hasNext()) {
+ this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> {
+ private LinkedList<Tuple> results;
+
+ public BufferedTupleSegmentQueue(int index, long thresholdBytes, boolean hasMaxQueueSize) {
+ super(index, thresholdBytes, hasMaxQueueSize);
+ this.results = Lists.newLinkedList();
+ }
+
+ @Override
+ protected Queue<Tuple> getInMemoryQueue() {
+ return results;
+ }
+
+ @Override
+ protected long sizeOf(Tuple e) {
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+ return Bytes.SIZEOF_INT * 2 + kv.getLength();
+ }
+
+ @Override
+ protected void writeToStream(DataOutputStream out, Tuple e) throws IOException {
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+ out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
+ out.writeInt(kv.getLength());
+ out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+ }
+
+ @Override
+ protected Tuple readFromStream(DataInputStream in) throws IOException {
+ int length = in.readInt();
+ if (length < 0) return null;
+
+ byte[] b = new byte[length];
+ in.readFully(b);
+ Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
+ return new ResultTuple(result);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 39bd4ca..4bbb8d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -50,11 +51,14 @@ import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.function.ArrayIndexFunction;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.ValueBitSet;
@@ -68,6 +72,7 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -161,7 +166,15 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
scan.getAttribute(QueryConstants.LAST_SCAN) != null);
}
- final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner);
+ boolean spoolingEnabled =
+ env.getConfiguration().getBoolean(
+ QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
+ long thresholdBytes =
+ env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
+ final OrderedResultIterator iterator =
+ deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes);
if (iterator == null) {
return innerScanner;
}
@@ -169,15 +182,31 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
return getTopNScanner(env, innerScanner, iterator, tenantId);
}
- private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+ @VisibleForTesting
+ static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s,
+ boolean spoolingEnabled, long thresholdBytes) {
byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
if (topN == null) {
return null;
- }
- ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
- try {
+ }
+ int clientVersion = ScanUtil.getClientVersion(scan);
+ // Client including and after 4.15 and 5.1 are not going to serialize thresholdBytes
+ // so we need to decode this only for older clients to not break wire compat
+ boolean shouldDecodeSpoolThreshold =
+ (scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION) == null)
+ || (VersionUtil.decodeMajorVersion(clientVersion) > 5)
+ || (VersionUtil.decodeMajorVersion(clientVersion) == 5
+ && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
+ || (VersionUtil.decodeMajorVersion(clientVersion) == 4
+ && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
+ ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
+ try {
DataInputStream input = new DataInputStream(stream);
- int thresholdBytes = WritableUtils.readVInt(input);
+ if (shouldDecodeSpoolThreshold) {
+ // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the
+ // one set on server
+ WritableUtils.readVInt(input);
+ }
int limit = WritableUtils.readVInt(input);
int estimatedRowSize = WritableUtils.readVInt(input);
int size = WritableUtils.readVInt(input);
@@ -189,8 +218,8 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
}
PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
- return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
- estimatedRowSize);
+ return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
+ thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index 51a7dd8..ef4b607 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -36,9 +36,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
public class OrderedAggregatingResultIterator extends OrderedResultIterator implements AggregatingResultIterator {
public OrderedAggregatingResultIterator(AggregatingResultIterator delegate,
- List<OrderByExpression> orderByExpressions, int thresholdBytes, Integer limit, Integer offset)
+ List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, long thresholdBytes,
+ Integer limit, Integer offset)
throws SQLException {
- super(delegate, orderByExpressions, thresholdBytes, limit, offset);
+ super(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 22712ff..ec9929e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -22,16 +22,22 @@ import static com.google.common.base.Preconditions.checkPositionIndex;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Queue;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
@@ -68,6 +74,44 @@ public class OrderedResultIterator implements PeekingResultIterator {
Tuple getResult() {
return result;
}
+
+ static long sizeOf(ResultEntry e) {
+ return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
+ }
+
+ private static long sizeof(List<KeyValue> kvs) {
+ long size = Bytes.SIZEOF_INT; // totalLen
+
+ for (KeyValue kv : kvs) {
+ size += kv.getLength();
+ size += Bytes.SIZEOF_INT; // kv.getLength
+ }
+
+ return size;
+ }
+
+ private static long sizeof(ImmutableBytesWritable[] sortKeys) {
+ long size = Bytes.SIZEOF_INT;
+ if (sortKeys != null) {
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ size += sortKey.getLength();
+ }
+ size += Bytes.SIZEOF_INT;
+ }
+ }
+ return size;
+ }
+
+ private static List<KeyValue> toKeyValues(ResultEntry entry) {
+ Tuple result = entry.getResult();
+ int size = result.size();
+ List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+ for (int i = 0; i < size; i++) {
+ kvs.add(PhoenixKeyValueUtil.maybeCopyCell(result.getValue(i)));
+ }
+ return kvs;
+ }
}
/** A function that returns Nth key for a given {@link ResultEntry}. */
@@ -91,7 +135,8 @@ public class OrderedResultIterator implements PeekingResultIterator {
}
};
- private final int thresholdBytes;
+ private final boolean spoolingEnabled;
+ private final long thresholdBytes;
private final Integer limit;
private final Integer offset;
private final ResultIterator delegate;
@@ -106,20 +151,22 @@ public class OrderedResultIterator implements PeekingResultIterator {
}
public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
- int thresholdBytes, Integer limit, Integer offset) {
- this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0);
+ boolean spoolingEnabled, long thresholdBytes, Integer limit, Integer offset) {
+ this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0);
}
public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
- int thresholdBytes) throws SQLException {
- this(delegate, orderByExpressions, thresholdBytes, null, null);
+ boolean spoolingEnabled, long thresholdBytes) throws SQLException {
+ this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, null, null);
}
- public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
- int thresholdBytes, Integer limit, Integer offset,int estimatedRowSize) {
+ public OrderedResultIterator(ResultIterator delegate,
+ List<OrderByExpression> orderByExpressions, boolean spoolingEnabled,
+ long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) {
checkArgument(!orderByExpressions.isEmpty());
this.delegate = delegate;
this.orderByExpressions = orderByExpressions;
+ this.spoolingEnabled = spoolingEnabled;
this.thresholdBytes = thresholdBytes;
this.offset = offset == null ? 0 : offset;
if (limit != null) {
@@ -208,8 +255,9 @@ public class OrderedResultIterator implements PeekingResultIterator {
List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
try{
- final BufferedSortedQueue queueEntries = new BufferedSortedQueue(comparator, limit,
- thresholdBytes);
+ final SizeAwareQueue<ResultEntry> queueEntries =
+ PhoenixQueues.newResultEntrySortedQueue(comparator, limit, spoolingEnabled,
+ thresholdBytes);
resultIterator = new PeekingResultIterator() {
int count = 0;
@@ -249,7 +297,11 @@ public class OrderedResultIterator implements PeekingResultIterator {
@Override
public void close() throws SQLException {
- queueEntries.close();
+ try {
+ queueEntries.close();
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
}
};
for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java
new file mode 100644
index 0000000..1685f5e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.LinkedList;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+public class PhoenixQueues {
+
+ private PhoenixQueues() {
+ }
+
+ public static SizeAwareQueue<ResultEntry> newBufferedResultEntrySortedQueue(
+ Comparator<ResultEntry> comparator, Integer limit, long thresholdBytes)
+ throws IOException {
+ return new BufferedSortedQueue(comparator, limit, thresholdBytes);
+ }
+
+ public static SizeAwareQueue<Tuple> newBufferedTupleQueue(long thresholdBytes) {
+ return new BufferedTupleQueue(thresholdBytes);
+ }
+
+ public static SizeAwareQueue<ResultEntry> newSizeBoundResultEntrySortedQueue(
+ Comparator<ResultEntry> comparator, Integer limit, long maxSizeBytes) {
+ limit = limit == null ? -1 : limit;
+ MinMaxPriorityQueue<ResultEntry> queue =
+ limit < 0 ? MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+ : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit)
+ .create();
+ return new SizeBoundQueue<ResultEntry>(maxSizeBytes, queue) {
+ @Override
+ public long sizeOf(org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry e) {
+ return ResultEntry.sizeOf(e);
+ }
+
+ };
+ }
+
+ public static SizeAwareQueue<Tuple> newSizeBoundTupleQueue(long maxSizeBytes) {
+ LinkedList<Tuple> results = Lists.newLinkedList();
+ return new SizeBoundQueue<Tuple>(maxSizeBytes, results) {
+
+ @Override
+ public long sizeOf(Tuple e) {
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+ return Bytes.SIZEOF_INT * 2 + kv.getLength();
+ }
+
+ };
+ }
+
+ public static SizeAwareQueue<ResultEntry> newResultEntrySortedQueue(
+ Comparator<ResultEntry> comparator, Integer limit, boolean spoolingEnabled,
+ long thresholdBytes) throws IOException {
+ if (spoolingEnabled) {
+ return newBufferedResultEntrySortedQueue(comparator, limit, thresholdBytes);
+ } else {
+ return newSizeBoundResultEntrySortedQueue(comparator, limit, thresholdBytes);
+ }
+ }
+
+ public static SizeAwareQueue<Tuple> newTupleQueue(boolean spoolingEnabled,
+ long thresholdBytes) {
+ if (spoolingEnabled) {
+ return newBufferedTupleQueue(thresholdBytes);
+ } else {
+ return newSizeBoundTupleQueue(thresholdBytes);
+ }
+ }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
similarity index 56%
copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
copy to phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
index c5eeaff..73b3554 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
@@ -15,22 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.end2end;
+package org.apache.phoenix.iterate;
-import java.util.Map;
+import java.io.Closeable;
+import java.util.Queue;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
+public interface SizeAwareQueue<T> extends Queue<T>, Closeable {
-import com.google.common.collect.Maps;
+ public long getByteSize();
-public class OrderByWithSpillingIT extends OrderByIT {
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
- // do lot's of spooling!
- props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
- }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java
new file mode 100644
index 0000000..eb1e6be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Queue;
+
+public abstract class SizeBoundQueue<T> extends AbstractQueue<T> implements SizeAwareQueue<T> {
+
+ private long maxSizeBytes;
+ private Queue<T> delegate;
+ private long currentSize;
+
+ public SizeBoundQueue(long maxSizeBytes, Queue<T> delegate) {
+ assert maxSizeBytes > 0;
+ this.maxSizeBytes = maxSizeBytes;
+ this.delegate = delegate;
+ }
+
+ abstract public long sizeOf(T e);
+
+ @Override
+ public boolean offer(T e) {
+ boolean success = false;
+ long elementSize = sizeOf(e);
+ if ((currentSize + elementSize) < maxSizeBytes) {
+ success = delegate.offer(e);
+ if (success) {
+ currentSize += elementSize;
+ }
+ }
+ return success;
+ }
+
+ @Override
+ public boolean add(T e) {
+ try {
+ return super.add(e);
+ } catch (IllegalStateException ex) {
+ throw new IllegalStateException(
+ "Queue full. Consider increasing memory threshold or spooling to disk", ex);
+ }
+ }
+
+ @Override
+ public T poll() {
+ T e = delegate.poll();
+ if (e != null) {
+ currentSize -= sizeOf(e);
+ }
+ return e;
+ }
+
+ @Override
+ public T peek() {
+ return delegate.peek();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.clear();
+ }
+
+ @Override
+ public long getByteSize() {
+ return currentSize;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return delegate.iterator();
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 3136ca8..fa90b1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -95,7 +95,8 @@ public class SpoolingResultIterator implements PeekingResultIterator {
private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services) throws SQLException {
this (spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(),
- services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
+ services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES),
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
}
@@ -109,7 +110,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
* the memory manager) is exceeded.
* @throws SQLException
*/
- SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+ SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final long thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
this.spoolMetrics = sMetrics;
this.memoryMetrics = mMetrics;
boolean success = false;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e38f372..6742cb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -44,7 +44,16 @@ public interface QueryServices extends SQLCloseable {
public static final String THREAD_POOL_SIZE_ATTRIB = "phoenix.query.threadPoolSize";
public static final String QUEUE_SIZE_ATTRIB = "phoenix.query.queueSize";
public static final String THREAD_TIMEOUT_MS_ATTRIB = "phoenix.query.timeoutMs";
- public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = "phoenix.query.spoolThresholdBytes";
+ public static final String SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB =
+ "phoenix.query.server.spoolThresholdBytes";
+ public static final String CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB =
+ "phoenix.query.client.spoolThresholdBytes";
+ public static final String CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB =
+ "phoenix.query.client.orderBy.spooling.enabled";
+ public static final String CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB =
+ "phoenix.query.client.join.spooling.enabled";
+ public static final String SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB =
+ "phoenix.query.server.orderBy.spooling.enabled";
public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal";
public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b6755d4..b7fc119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
+import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
@@ -79,10 +80,10 @@ import static org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_I
import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC;
import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
+import static org.apache.phoenix.query.QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
-import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED;
import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
@@ -134,6 +135,11 @@ public class QueryServicesOptions {
public static final int DEFAULT_QUEUE_SIZE = 5000;
public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
+ public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
+ public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
+ public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = true;
+ public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
+ public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = true;
public static final String DEFAULT_SPOOL_DIRECTORY = System.getProperty("java.io.tmpdir");
public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
@@ -408,7 +414,8 @@ public class QueryServicesOptions {
.setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
.setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
.setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS)
- .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES)
+ .setIfUnset(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES)
+ .setIfUnset(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES)
.setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
.setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
.setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
@@ -522,8 +529,12 @@ public class QueryServicesOptions {
return set(THREAD_TIMEOUT_MS_ATTRIB, threadTimeoutMs);
}
- public QueryServicesOptions setSpoolThresholdBytes(int spoolThresholdBytes) {
- return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
+ public QueryServicesOptions setClientSpoolThresholdBytes(long spoolThresholdBytes) {
+ return set(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
+ }
+
+ public QueryServicesOptions setServerSpoolThresholdBytes(long spoolThresholdBytes) {
+ return set(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
}
public QueryServicesOptions setSpoolDirectory(String spoolDirectory) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
index 50ed8e9..88bef47 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
@@ -16,12 +16,23 @@
*/
package org.apache.phoenix.iterate;
+import static org.junit.Assert.fail;
+
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* Test class for {@link OrderedResultIterator}.
@@ -33,9 +44,47 @@ public class OrderedResultIteratorTest {
ResultIterator delegate = ResultIterator.EMPTY_ITERATOR;
List<OrderByExpression> orderByExpressions = Collections.singletonList(null);
int thresholdBytes = Integer.MAX_VALUE;
- OrderedResultIterator iterator = new OrderedResultIterator(delegate, orderByExpressions, thresholdBytes);
- // Should not throw an exception
+ boolean spoolingEnabled = true;
+ OrderedResultIterator iterator =
+ new OrderedResultIterator(delegate, orderByExpressions, spoolingEnabled,
+ thresholdBytes);
+ // Should not throw an exception
iterator.close();
- }
+ }
+
+ @Test
+ public void testSpoolingBackwardCompatibility() {
+ RegionScanner s = Mockito.mock(RegionScanner.class);
+ Scan scan = new Scan();
+ Expression exp = LiteralExpression.newConstant(Boolean.TRUE);
+ OrderByExpression ex = new OrderByExpression(exp, false, false);
+ ScanRegionObserver.serializeIntoScan(scan, 0, Arrays.asList(ex), 100);
+ // Check 5.1.0 & Check > 5.1.0
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.1.0"));
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.2.0"));
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+ // Check 4.15.0 Check > 4.15.0
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.0"));
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.1"));
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+
+ // Check < 5.1
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.0.0"));
+ try {
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+ fail("Deserialize should fail for 5.0.0 since we didn't serialize thresholdBytes");
+ } catch (IllegalArgumentException e) {
+ }
+ // Check < 4.15
+ ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.14.0"));
+ try {
+ NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100);
+ fail("Deserialize should fail for 4.14.0 since we didn't serialize thresholdBytes");
+ } catch (IllegalArgumentException e) {
+ }
+ }
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 59e7fd3..df9d843 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -99,7 +99,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
.setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
.setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS)
- .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+ .setClientSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+ .setServerSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
.setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY)
.setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
.setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
index b16e401..7c8e021 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
@@ -83,7 +83,15 @@ public class MetaDataUtilTest {
assertTrue(VersionUtil.encodeVersion("0.94.1-mapR")>VersionUtil.encodeVersion("0.94"));
assertTrue(VersionUtil.encodeVersion("1", "1", "3")>VersionUtil.encodeVersion("1", "1", "1"));
}
-
+
+ @Test
+ public void testDecode() {
+ int encodedVersion = VersionUtil.encodeVersion("4.15.5");
+ assertEquals(VersionUtil.decodeMajorVersion(encodedVersion), 4);
+ assertEquals(VersionUtil.decodeMinorVersion(encodedVersion), 15);
+ assertEquals(VersionUtil.decodePatchVersion(encodedVersion), 5);
+ }
+
@Test
public void testCompatibility() {
assertTrue(MetaDataUtil.areClientAndServerCompatible(VersionUtil.encodeVersion(1,2,1), 1, 2));