You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/02/19 07:24:33 UTC
[1/2] phoenix git commit: PHOENIX-2691 Exception while unpacking
resultset containing VARCHAR ARRAY of unspecified length
Repository: phoenix
Updated Branches:
refs/heads/master 45a9d670b -> 28a8b802c
PHOENIX-2691 Exception while unpacking resultset containing VARCHAR ARRAY of unspecified length
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cac03056
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cac03056
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cac03056
Branch: refs/heads/master
Commit: cac03056578170a82ba812aa4648e0e5b1a1bbb6
Parents: 45a9d67
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Feb 18 14:59:29 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Feb 18 15:14:48 2016 -0800
----------------------------------------------------------------------
.../apache/phoenix/end2end/GroupByCaseIT.java | 35 +++++++++
.../apache/phoenix/compile/GroupByCompiler.java | 74 ++++++++++++--------
.../phoenix/exception/SQLExceptionCode.java | 5 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 6 +-
.../phoenix/compile/QueryCompilerTest.java | 36 ++++++++++
5 files changed, 122 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cac03056/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
index 0f1568c..172f9f7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
@@ -226,4 +226,39 @@ public class GroupByCaseIT extends BaseHBaseManagedTimeIT {
conn.close();
}
+
+ @Test
+ public void testGroupByArray() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE test1(\n" +
+ " a VARCHAR NOT NULL,\n" +
+ " b VARCHAR,\n" +
+ " c INTEGER,\n" +
+ " d VARCHAR,\n" +
+ " e VARCHAR ARRAY,\n" +
+ " f BIGINT,\n" +
+ " g BIGINT,\n" +
+ " CONSTRAINT pk PRIMARY KEY(a)\n" +
+ ")");
+ conn.createStatement().execute("UPSERT INTO test1 VALUES('1', 'val', 100, 'a', ARRAY ['b'], 1, 2)");
+ conn.createStatement().execute("UPSERT INTO test1 VALUES('2', 'val', 100, 'a', ARRAY ['b'], 3, 4)");
+ conn.createStatement().execute("UPSERT INTO test1 VALUES('3', 'val', 100, 'a', ARRAY ['b','c'], 5, 6)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT c, SUM(f + g) AS sumone, d, e\n" +
+ "FROM test1\n" +
+ "WHERE b = 'val'\n" +
+ " AND a IN ('1','2','3')\n" +
+ "GROUP BY c, d, e\n" +
+ "ORDER BY sumone DESC");
+ assertTrue(rs.next());
+ assertEquals(100, rs.getInt(1));
+ assertEquals(11, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals(100, rs.getInt(1));
+ assertEquals(10, rs.getLong(2));
+ assertFalse(rs.next());
+ conn.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cac03056/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index 7d9df02..85478bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -38,8 +38,8 @@ import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -217,34 +217,53 @@ public class GroupByCompiler {
public int compare(Pair<Integer,Expression> gb1, Pair<Integer,Expression> gb2) {
Expression e1 = gb1.getSecond();
Expression e2 = gb2.getSecond();
- boolean isFixed1 = e1.getDataType().isFixedWidth();
- boolean isFixed2 = e2.getDataType().isFixedWidth();
+ PDataType t1 = e1.getDataType();
+ PDataType t2 = e2.getDataType();
+ boolean isFixed1 = t1.isFixedWidth();
+ boolean isFixed2 = t2.isFixedWidth();
boolean isFixedNullable1 = e1.isNullable() &&isFixed1;
boolean isFixedNullable2 = e2.isNullable() && isFixed2;
- if (isFixedNullable1 == isFixedNullable2) {
- if (isFixed1 == isFixed2) {
- // Not strictly necessary, but forces the order to match the schema
- // column order (with PK columns before value columns).
- //return o1.getColumnPosition() - o2.getColumnPosition();
- return gb1.getFirst() - gb2.getFirst();
- } else if (isFixed1) {
- return -1;
- } else {
+ boolean oae1 = onlyAtEndType(e1);
+ boolean oae2 = onlyAtEndType(e2);
+ if (oae1 == oae2) {
+ if (isFixedNullable1 == isFixedNullable2) {
+ if (isFixed1 == isFixed2) {
+ // Not strictly necessary, but forces the order to match the schema
+ // column order (with PK columns before value columns).
+ //return o1.getColumnPosition() - o2.getColumnPosition();
+ return gb1.getFirst() - gb2.getFirst();
+ } else if (isFixed1) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else if (isFixedNullable1) {
return 1;
+ } else {
+ return -1;
}
- } else if (isFixedNullable1) {
+ } else if (oae1) {
return 1;
} else {
return -1;
}
}
});
+ boolean foundOnlyAtEndType = false;
for (Pair<Integer,Expression> groupBy : groupBys) {
- expressions.add(groupBy.getSecond());
+ Expression e = groupBy.getSecond();
+ if (onlyAtEndType(e)) {
+ if (foundOnlyAtEndType) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS)
+ .setMessage(e.toString()).build().buildException();
+ }
+ foundOnlyAtEndType = true;
+ }
+ expressions.add(e);
}
for (int i = expressions.size()-2; i >= 0; i--) {
Expression expression = expressions.get(i);
- PDataType keyType = getKeyType(expression);
+ PDataType keyType = getGroupByDataType(expression);
if (keyType == expression.getDataType()) {
continue;
}
@@ -263,19 +282,16 @@ public class GroupByCompiler {
return groupBy;
}
- private static PDataType getKeyType(Expression expression) {
- PDataType type = expression.getDataType();
- if (!expression.isNullable() || !type.isFixedWidth()) {
- return type;
- }
- if (type.isCastableTo(PDecimal.INSTANCE)) {
- return PDecimal.INSTANCE;
- }
- if (type.isCastableTo(PVarchar.INSTANCE)) {
- return PVarchar.INSTANCE;
- }
- // This might happen if someone tries to group by an array
- throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause");
+ private static boolean onlyAtEndType(Expression expression) {
+ // Due to the encoding schema of these types, they may only be
+ // used once in a group by and are located at the end of the
+ // group by row key.
+ PDataType type = getGroupByDataType(expression);
+ return type.isArrayType() || type == PVarbinary.INSTANCE;
+ }
+
+ private static PDataType getGroupByDataType(Expression expression) {
+ return IndexUtil.getIndexColumnDataType(expression.isNullable(), expression.getDataType());
}
private GroupByCompiler() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cac03056/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 7ddd14c..65cb6db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -229,7 +229,6 @@ public enum SQLExceptionCode {
AGGREGATE_WITH_NOT_GROUP_BY_COLUMN(1018, "42Y27", "Aggregate may not contain columns not in GROUP BY."),
ONLY_AGGREGATE_IN_HAVING_CLAUSE(1019, "42Y26", "Only aggregate maybe used in the HAVING clause."),
UPSERT_COLUMN_NUMBERS_MISMATCH(1020, "42Y60", "Number of columns upserting must match number of values."),
- NO_TABLE_SPECIFIED_FOR_WILDCARD_SELECT(1057, "42Y10", "No table specified for wildcard select."),
// Table properties exception.
INVALID_BUCKET_NUM(1021, "42Y80", "Salt bucket numbers should be with 1 and 256."),
NO_SPLITS_ON_SALTED_TABLE(1022, "42Y81", "Should not specify split points on salted table with default row key order."),
@@ -263,8 +262,10 @@ public enum SQLExceptionCode {
UNALLOWED_LOCAL_INDEXES(1055, "43A12", "Local secondary indexes are configured to not be allowed."),
DESC_VARBINARY_NOT_SUPPORTED(1056, "43A13", "Descending VARBINARY columns not supported."),
+ NO_TABLE_SPECIFIED_FOR_WILDCARD_SELECT(1057, "42Y10", "No table specified for wildcard select."),
+ UNSUPPORTED_GROUP_BY_EXPRESSIONS(1058, "43A14", "Only a single VARBINARY, ARRAY, or nullable BINARY type may be referenced in a GROUP BY."),
- DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1069, "43A13", "Default column family not allowed on VIEW or shared INDEX."),
+ DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1069, "43A69", "Default column family not allowed on VIEW or shared INDEX."),
ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional."),
TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not be switched to non transactional."),
STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional."),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cac03056/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 98b88f4..8fc0c7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -87,10 +87,10 @@ import org.apache.phoenix.schema.types.PDecimal;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
-import co.cask.tephra.TxConstants;
-
import com.google.common.collect.Lists;
+import co.cask.tephra.TxConstants;
+
public class IndexUtil {
public static final String INDEX_COLUMN_NAME_SEP = ":";
public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
@@ -129,7 +129,7 @@ public class IndexUtil {
if (PBinary.INSTANCE.equals(dataType)) {
return PVarbinary.INSTANCE;
}
- throw new IllegalArgumentException("Unsupported non nullable index type " + dataType);
+ throw new IllegalArgumentException("Unsupported non nullable type " + dataType);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cac03056/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index b3baa73..cfec967 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2136,6 +2136,42 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
@Test
+ public void testGroupByVarbinaryOrArray() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE T1 (PK VARCHAR PRIMARY KEY, c1 VARCHAR, c2 VARBINARY, C3 VARCHAR ARRAY, c4 VARBINARY, C5 VARCHAR ARRAY, C6 BINARY(10)) ");
+ try {
+ conn.createStatement().executeQuery("SELECT c1 FROM t1 GROUP BY c1,c2,c3");
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS.getErrorCode(), e.getErrorCode());
+ }
+ try {
+ conn.createStatement().executeQuery("SELECT c1 FROM t1 GROUP BY c1,c3,c2");
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS.getErrorCode(), e.getErrorCode());
+ }
+ try {
+ conn.createStatement().executeQuery("SELECT c1 FROM t1 GROUP BY c1,c2,c4");
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS.getErrorCode(), e.getErrorCode());
+ }
+ try {
+ conn.createStatement().executeQuery("SELECT c1 FROM t1 GROUP BY c1,c3,c5");
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS.getErrorCode(), e.getErrorCode());
+ }
+ try {
+ conn.createStatement().executeQuery("SELECT c1 FROM t1 GROUP BY c1,c6,c5");
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.UNSUPPORTED_GROUP_BY_EXPRESSIONS.getErrorCode(), e.getErrorCode());
+ }
+ }
+
+ @Test
public void testQueryWithSCN() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(1000));
[2/2] phoenix git commit: PHOENIX-2666 Performance regression:
Aggregate query with filter on table with multiple column families
Posted by ja...@apache.org.
PHOENIX-2666 Performance regression: Aggregate query with filter on table with multiple column families
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/28a8b802
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/28a8b802
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/28a8b802
Branch: refs/heads/master
Commit: 28a8b802c2a32f9735bf187f08ef0a9e33baf2dd
Parents: cac0305
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Feb 18 22:16:51 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Feb 18 22:16:51 2016 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/DeleteIT.java | 3 +-
.../StatsCollectorWithSplitsAndMultiCFIT.java | 3 +-
.../apache/phoenix/compile/DeleteCompiler.java | 6 +-
.../apache/phoenix/compile/UpsertCompiler.java | 7 +-
.../phoenix/filter/ColumnProjectionFilter.java | 2 +
.../phoenix/iterate/BaseResultIterators.java | 232 +++++++++++--------
.../apache/phoenix/iterate/ExplainTable.java | 28 +--
.../phoenix/compile/QueryCompilerTest.java | 29 +++
8 files changed, 189 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 745c730..6b4eead 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -205,7 +205,8 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
deleteStmt = "DELETE FROM IntIntKeyTest WHERE j IS NULL";
stmt = conn.prepareStatement(deleteStmt);
assertIndexUsed(conn, deleteStmt, indexName, createIndex);
- stmt.execute();
+ int deleteCount = stmt.executeUpdate();
+ assertEquals(3, deleteCount);
if (!autoCommit) {
conn.commit();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
index 13cd54c..d922ad9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
@@ -111,10 +111,9 @@ public class StatsCollectorWithSplitsAndMultiCFIT extends StatsCollectorAbstract
rs = conn.createStatement().executeQuery(
"SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH),COUNT(*) from SYSTEM.STATS where PHYSICAL_NAME = '"
- + STATS_TEST_TABLE_NAME_NEW + "' GROUP BY COLUMN_FAMILY");
+ + STATS_TEST_TABLE_NAME_NEW + "' GROUP BY COLUMN_FAMILY ORDER BY COLUMN_FAMILY");
assertTrue(rs.next());
- assertTrue(rs.next());
assertEquals("A", rs.getString(1));
assertEquals(25, rs.getInt(2));
assertEquals(12420, rs.getInt(3));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 4c41f82..8e9e1de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -508,7 +508,11 @@ public class DeleteCompiler {
// The coprocessor will delete each row returned from the scan
// Ignoring ORDER BY, since with auto commit on and no limit makes no difference
SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
- final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ if (plan.getProjector().projectEveryRow()) {
+ projectorToBe = new RowProjector(projectorToBe,true);
+ }
+ final RowProjector projector = projectorToBe;
final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
mutationPlans.add(new MutationPlan() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 0f7f6f9..6ec7f70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,12 @@ public class UpsertCompiler {
PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
- final RowProjector aggProjector = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+ RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+ if (queryPlan.getProjector().projectEveryRow()) {
+ aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
+ }
+ final RowProjector aggProjector = aggProjectorToBe;
+
/*
* Transfer over PTable representing subset of columns selected, but all PK columns.
* Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index cf9f7ab..b8b0350 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -66,6 +66,7 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
this.conditionOnlyCfs = conditionOnlyCfs;
}
+ @Override
public void readFields(DataInput input) throws IOException {
this.emptyCFName = WritableUtils.readCompressedByteArray(input);
int familyMapSize = WritableUtils.readVInt(input);
@@ -93,6 +94,7 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
}
}
+ @Override
public void write(DataOutput output) throws IOException {
WritableUtils.writeCompressedByteArray(output, this.emptyCFName);
WritableUtils.writeVInt(output, this.columnsTracker.size());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index d8256d7..fa09704 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -107,6 +108,7 @@ import com.google.common.collect.Lists;
public abstract class BaseResultIterators extends ExplainTable implements ResultIterators {
private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
+ private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "94", "12");
private final List<List<Scan>> scans;
private final List<KeyRange> splits;
@@ -167,37 +169,40 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} else {
FilterableStatement statement = plan.getStatement();
RowProjector projector = plan.getProjector();
+ boolean optimizeProjection = false;
boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereConditionColumns().isEmpty();
if (!projector.projectEverything()) {
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
// Where condition columns also will get added into familyMap
- // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
- if (familyMap.isEmpty() && context.getWhereConditionColumns().isEmpty()
- && table.getColumnFamilies().size() == 1) {
+ // When where conditions are present, we cannot add FirstKeyOnlyFilter at beginning.
+ // FIXME: we only enter this if the number of column families is 1 because otherwise
+ // local indexes break because it appears that the column families in the PTable do
+ // not match the actual column families of the table (which is bad).
+ if (keyOnlyFilter && table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
- } else if (projector.projectEveryRow()) {
- if (table.getViewType() == ViewType.MAPPED) {
- // Since we don't have the empty key value in MAPPED tables,
- // we must select all CFs in HRS. However, only the
- // selected column values are returned back to client.
- for (PColumnFamily family : table.getColumnFamilies()) {
- scan.addFamily(family.getName().getBytes());
- }
- } else {
- byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
- // Project empty key value unless the column family containing it has
- // been projected in its entirety.
- if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
- scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
- }
- }
} else {
- for (Pair<byte[], byte[]> whereColumn : context.getWhereConditionColumns()) {
- scan.addColumn(whereColumn.getFirst(), whereColumn.getSecond());
+ optimizeProjection = true;
+ if (projector.projectEveryRow()) {
+ if (table.getViewType() == ViewType.MAPPED) {
+ // Since we don't have the empty key value in MAPPED tables,
+ // we must project all CFs in HRS. However, only the
+ // selected column values are returned back to client.
+ context.getWhereConditionColumns().clear();
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ context.addWhereCoditionColumn(family.getName().getBytes(), null);
+ }
+ } else {
+ byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+ // Project empty key value unless the column family containing it has
+ // been projected in its entirety.
+ if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+ scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+ }
+ }
}
}
}
@@ -210,10 +215,113 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
}
- doColumnProjectionOptimization(context, scan, table, statement);
+ if (optimizeProjection) {
+ optimizeProjection(context, scan, table, statement);
+ }
}
}
+ private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ // columnsTracker contain cf -> qualifiers which should get returned.
+ Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
+ new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+ Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ int referencedCfCount = familyMap.size();
+ boolean filteredColumnNotInProjection = false;
+ for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+ byte[] filteredFamily = whereCol.getFirst();
+ if (!(familyMap.containsKey(filteredFamily))) {
+ referencedCfCount++;
+ filteredColumnNotInProjection = true;
+ } else if (!filteredColumnNotInProjection) {
+ NavigableSet<byte[]> projectedColumns = familyMap.get(filteredFamily);
+ if (projectedColumns != null) {
+ byte[] filteredColumn = whereCol.getSecond();
+ if (filteredColumn == null) {
+ filteredColumnNotInProjection = true;
+ } else {
+ filteredColumnNotInProjection = !projectedColumns.contains(filteredColumn);
+ }
+ }
+ }
+ }
+ boolean preventSeekToColumn;
+ if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
+ // Allow seeking to column during filtering
+ preventSeekToColumn = false;
+ } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
+ // Prevent seeking to column during filtering
+ preventSeekToColumn = true;
+ } else {
+ int hbaseServerVersion = context.getConnection().getQueryServices().getLowestClusterHBaseVersion();
+ // When only a single column family is referenced, there are no hints, and HBase server version
+ // is less than when the fix for HBASE-13109 went in (0.98.12), then we prevent seeking to a
+ // column.
+ preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
+ }
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+ NavigableSet<byte[]> qs = entry.getValue();
+ NavigableSet<ImmutableBytesPtr> cols = null;
+ if (qs != null) {
+ cols = new TreeSet<ImmutableBytesPtr>();
+ for (byte[] q : qs) {
+ cols.add(new ImmutableBytesPtr(q));
+ }
+ }
+ columnsTracker.put(cf, cols);
+ }
+ // Making sure that where condition CFs are getting scanned at HRS.
+ for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+ byte[] family = whereCol.getFirst();
+ if (preventSeekToColumn) {
+ if (!(familyMap.containsKey(family))) {
+ conditionOnlyCfs.add(family);
+ }
+ scan.addFamily(family);
+ } else {
+ if (familyMap.containsKey(family)) {
+ // where column's CF is present. If there are some specific columns added against this CF, we
+ // need to ensure this where column also getting added in it.
+ // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
+ // specifically add the where column. Adding that will remove the cf1.* stuff and only this
+ // where condition column will get returned!
+ NavigableSet<byte[]> cols = familyMap.get(family);
+ // cols is null means the whole CF will get scanned.
+ if (cols != null) {
+ if (whereCol.getSecond() == null) {
+ scan.addFamily(family);
+ } else {
+ scan.addColumn(family, whereCol.getSecond());
+ }
+ }
+ } else if (whereCol.getSecond() == null) {
+ scan.addFamily(family);
+ } else {
+ // where column's CF itself is not present in family map. We need to add the column
+ scan.addColumn(family, whereCol.getSecond());
+ }
+ }
+ }
+ if (!columnsTracker.isEmpty()) {
+ if (preventSeekToColumn) {
+ for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+ // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+ // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+ scan.addFamily(f.get());
+ }
+ }
+ // We don't need this filter for aggregates, as we're not returning back what's
+ // in the scan in this case. We still want the other optimization that causes
+ // the ExplicitColumnTracker not to be used, though.
+ if (!statement.isAggregate() && filteredColumnNotInProjection) {
+ ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+ columnsTracker, conditionOnlyCfs));
+ }
+ }
+ }
+
public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
this.plan = plan;
@@ -243,86 +351,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
this.allFutures = Lists.newArrayListWithExpectedSize(1);
}
- private static void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
- Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
- if (familyMap != null && !familyMap.isEmpty()) {
- // columnsTracker contain cf -> qualifiers which should get returned.
- Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
- new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
- Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- int referencedCfCount = familyMap.size();
- for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
- if (!(familyMap.containsKey(whereCol.getFirst()))) {
- referencedCfCount++;
- }
- }
- boolean useOptimization;
- if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
- // Do not use the optimization
- useOptimization = false;
- } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
- // Strictly use the optimization
- useOptimization = true;
- } else {
- // when referencedCfCount is >1 and no Hints, we are not using the optimization
- useOptimization = referencedCfCount == 1;
- }
- if (useOptimization) {
- for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
- ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
- NavigableSet<byte[]> qs = entry.getValue();
- NavigableSet<ImmutableBytesPtr> cols = null;
- if (qs != null) {
- cols = new TreeSet<ImmutableBytesPtr>();
- for (byte[] q : qs) {
- cols.add(new ImmutableBytesPtr(q));
- }
- }
- columnsTracker.put(cf, cols);
- }
- }
- // Making sure that where condition CFs are getting scanned at HRS.
- for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
- if (useOptimization) {
- if (!(familyMap.containsKey(whereCol.getFirst()))) {
- scan.addFamily(whereCol.getFirst());
- conditionOnlyCfs.add(whereCol.getFirst());
- }
- } else {
- if (familyMap.containsKey(whereCol.getFirst())) {
- // where column's CF is present. If there are some specific columns added against this CF, we
- // need to ensure this where column also getting added in it.
- // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
- // specifically add the where column. Adding that will remove the cf1.* stuff and only this
- // where condition column will get returned!
- NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
- // cols is null means the whole CF will get scanned.
- if (cols != null) {
- scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
- }
- } else {
- // where column's CF itself is not present in family map. We need to add the column
- scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
- }
- }
- }
- if (useOptimization && !columnsTracker.isEmpty()) {
- for (ImmutableBytesPtr f : columnsTracker.keySet()) {
- // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
- // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
- scan.addFamily(f.get());
- }
- // We don't need this filter for aggregates, as we're not returning back what's
- // in the scan in this case. We still want the other optimization that causes
- // the ExplicitColumnTracker not to be used, though.
- if (!(statement.isAggregate())) {
- ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
- columnsTracker, conditionOnlyCfs));
- }
- }
- }
- }
-
@Override
public List<KeyRange> getSplits() {
if (splits == null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 1fa4526..4a71483 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -130,11 +130,11 @@ public abstract class ExplainTable {
planSteps.add(" ROW TIMESTAMP FILTER [" + range.getMin() + ", " + range.getMax() + ")");
}
+ PageFilter pageFilter = null;
+ FirstKeyOnlyFilter firstKeyOnlyFilter = null;
+ BooleanExpressionFilter whereFilter = null;
Iterator<Filter> filterIterator = ScanUtil.getFilterIterator(scan);
if (filterIterator.hasNext()) {
- PageFilter pageFilter = null;
- FirstKeyOnlyFilter firstKeyOnlyFilter = null;
- BooleanExpressionFilter whereFilter = null;
do {
Filter filter = filterIterator.next();
if (filter instanceof FirstKeyOnlyFilter) {
@@ -145,17 +145,17 @@ public abstract class ExplainTable {
whereFilter = (BooleanExpressionFilter)filter;
}
} while (filterIterator.hasNext());
- if (whereFilter != null) {
- planSteps.add(" SERVER FILTER BY " + (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ") + whereFilter.toString());
- } else if (firstKeyOnlyFilter != null) {
- planSteps.add(" SERVER FILTER BY FIRST KEY ONLY");
- }
- if (!orderBy.getOrderByExpressions().isEmpty() && groupBy.isEmpty()) { // with GROUP BY, sort happens client-side
- planSteps.add(" SERVER" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S"))
- + " SORTED BY " + orderBy.getOrderByExpressions().toString());
- } else if (pageFilter != null) {
- planSteps.add(" SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
- }
+ }
+ if (whereFilter != null) {
+ planSteps.add(" SERVER FILTER BY " + (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ") + whereFilter.toString());
+ } else if (firstKeyOnlyFilter != null) {
+ planSteps.add(" SERVER FILTER BY FIRST KEY ONLY");
+ }
+ if (!orderBy.getOrderByExpressions().isEmpty() && groupBy.isEmpty()) { // with GROUP BY, sort happens client-side
+ planSteps.add(" SERVER" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S"))
+ + " SORTED BY " + orderBy.getOrderByExpressions().toString());
+ } else if (pageFilter != null) {
+ planSteps.add(" SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
}
Integer groupByLimit = null;
byte[] groupByLimitBytes = scan.getAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/28a8b802/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index cfec967..ce38cfd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -37,10 +37,12 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -52,6 +54,7 @@ import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.CountAggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.expression.function.TimeUnit;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
@@ -68,6 +71,7 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Test;
@@ -2213,5 +2217,30 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
}
+ private static boolean hasColumnProjectionFilter(Scan scan) {
+ Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
+ while (iterator.hasNext()) {
+ Filter filter = iterator.next();
+ if (filter instanceof ColumnProjectionFilter) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Test
+ public void testColumnProjectionOptimized() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("CREATE TABLE t(k INTEGER PRIMARY KEY, a.v1 VARCHAR, a.v1b VARCHAR, b.v2 VARCHAR, c.v3 VARCHAR)");
+ assertTrue(hasColumnProjectionFilter(projectQuery("SELECT k, v1 FROM t WHERE v2 = 'foo'")));
+ assertFalse(hasColumnProjectionFilter(projectQuery("SELECT k, v1 FROM t WHERE v1 = 'foo'")));
+ assertFalse(hasColumnProjectionFilter(projectQuery("SELECT v1,v2 FROM t WHERE v1 = 'foo'")));
+ assertTrue(hasColumnProjectionFilter(projectQuery("SELECT v1,v2 FROM t WHERE v1 = 'foo' and v2 = 'bar' and v3 = 'bas'")));
+ assertFalse(hasColumnProjectionFilter(projectQuery("SELECT a.* FROM t WHERE v1 = 'foo' and v1b = 'bar'")));
+ } finally {
+ conn.close();
+ }
+ }
}