You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/11/14 21:30:47 UTC
[1/5] phoenix git commit: PHOENIX-1799 Support many-to-many joins
Repository: phoenix
Updated Branches:
refs/heads/master 9b285dae0 -> eddc846d8
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a715a796/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
new file mode 100644
index 0000000..03eda06
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ResultUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+
+public class SortMergeJoinPlan implements QueryPlan {
+ private static final byte[] EMPTY_PTR = new byte[0];
+
+ private final StatementContext context;
+ private final FilterableStatement statement;
+ private final TableRef table;
+ private final JoinType type;
+ private final QueryPlan lhsPlan;
+ private final QueryPlan rhsPlan;
+ private final List<Expression> lhsKeyExpressions;
+ private final List<Expression> rhsKeyExpressions;
+ private final KeyValueSchema joinedSchema;
+ private final KeyValueSchema lhsSchema;
+ private final KeyValueSchema rhsSchema;
+ private final int rhsFieldPosition;
+
+ public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table,
+ JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
+ PTable joinedTable, PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
+ if (type == JoinType.Right) throw new IllegalArgumentException("JoinType should not be " + type);
+ this.context = context;
+ this.statement = statement;
+ this.table = table;
+ this.type = type;
+ this.lhsPlan = lhsPlan;
+ this.rhsPlan = rhsPlan;
+ this.lhsKeyExpressions = lhsKeyExpressions;
+ this.rhsKeyExpressions = rhsKeyExpressions;
+ this.joinedSchema = buildSchema(joinedTable);
+ this.lhsSchema = buildSchema(lhsTable);
+ this.rhsSchema = buildSchema(rhsTable);
+ this.rhsFieldPosition = rhsFieldPosition;
+ }
+
+ private static KeyValueSchema buildSchema(PTable table) {
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ if (table != null) {
+ for (PColumn column : table.getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ builder.addField(column);
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return type == JoinType.Semi || type == JoinType.Anti ?
+ new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) :
+ new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator());
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> steps = Lists.newArrayList();
+ steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES");
+ for (String step : lhsPlan.getExplainPlan().getPlanSteps()) {
+ steps.add(" " + step);
+ }
+ steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
+ for (String step : rhsPlan.getExplainPlan().getPlanSteps()) {
+ steps.add(" " + step);
+ }
+ return new ExplainPlan(steps);
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return lhsPlan.getEstimatedSize() + rhsPlan.getEstimatedSize();
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return table;
+ }
+
+ @Override
+ public RowProjector getProjector() {
+ return null;
+ }
+
+ @Override
+ public Integer getLimit() {
+ return null;
+ }
+
+ @Override
+ public OrderBy getOrderBy() {
+ return null;
+ }
+
+ @Override
+ public GroupBy getGroupBy() {
+ return null;
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.<KeyRange> emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.<List<Scan>> emptyList();
+ }
+
+ @Override
+ public FilterableStatement getStatement() {
+ return statement;
+ }
+
+ @Override
+ public boolean isDegenerate() {
+ return false;
+ }
+
+ @Override
+ public boolean isRowKeyOrdered() {
+ return false;
+ }
+
+ private class BasicJoinIterator implements ResultIterator {
+ private final ResultIterator lhsIterator;
+ private final ResultIterator rhsIterator;
+ private boolean initialized;
+ private Tuple lhsTuple;
+ private Tuple rhsTuple;
+ private JoinKey lhsKey;
+ private JoinKey rhsKey;
+ private Tuple nextLhsTuple;
+ private Tuple nextRhsTuple;
+ private JoinKey nextLhsKey;
+ private JoinKey nextRhsKey;
+ private ValueBitSet destBitSet;
+ private ValueBitSet lhsBitSet;
+ private ValueBitSet rhsBitSet;
+ private byte[] emptyProjectedValue;
+ private MappedByteBufferTupleQueue queue;
+ private Iterator<Tuple> queueIterator;
+
+ public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
+ this.lhsIterator = lhsIterator;
+ this.rhsIterator = rhsIterator;
+ this.initialized = false;
+ this.lhsTuple = null;
+ this.rhsTuple = null;
+ this.lhsKey = new JoinKey(lhsKeyExpressions);
+ this.rhsKey = new JoinKey(rhsKeyExpressions);
+ this.nextLhsTuple = null;
+ this.nextRhsTuple = null;
+ this.nextLhsKey = new JoinKey(lhsKeyExpressions);
+ this.nextRhsKey = new JoinKey(rhsKeyExpressions);
+ this.destBitSet = ValueBitSet.newInstance(joinedSchema);
+ this.lhsBitSet = ValueBitSet.newInstance(lhsSchema);
+ this.rhsBitSet = ValueBitSet.newInstance(rhsSchema);
+ lhsBitSet.clear();
+ int len = lhsBitSet.getEstimatedLength();
+ this.emptyProjectedValue = new byte[len];
+ lhsBitSet.toBytes(emptyProjectedValue, 0);
+ int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+ QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
+ this.queueIterator = null;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ lhsIterator.close();
+ rhsIterator.close();
+ queue.close();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (!initialized) {
+ init();
+ }
+
+ Tuple next = null;
+ while (next == null && !isEnd()) {
+ if (queueIterator != null) {
+ if (queueIterator.hasNext()) {
+ next = join(lhsTuple, queueIterator.next());
+ } else {
+ boolean eq = nextLhsTuple != null && lhsKey.equals(nextLhsKey);
+ advance(true);
+ if (eq) {
+ queueIterator = queue.iterator();
+ } else {
+ queue.clear();
+ queueIterator = null;
+ }
+ }
+ } else if (lhsTuple != null) {
+ if (rhsTuple != null) {
+ if (lhsKey.equals(rhsKey)) {
+ next = join(lhsTuple, rhsTuple);
+ if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) {
+ queue.offer(rhsTuple);
+ if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) {
+ queueIterator = queue.iterator();
+ advance(true);
+ }
+ } else if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) {
+ advance(true);
+ }
+ advance(false);
+ } else if (lhsKey.compareTo(rhsKey) < 0) {
+ if (type == JoinType.Full || type == JoinType.Left) {
+ next = join(lhsTuple, null);
+ }
+ advance(true);
+ } else {
+ if (type == JoinType.Full) {
+ next = join(null, rhsTuple);
+ }
+ advance(false);
+ }
+ } else { // left-join or full-join
+ next = join(lhsTuple, null);
+ advance(true);
+ }
+ } else { // full-join
+ next = join(null, rhsTuple);
+ advance(false);
+ }
+ }
+
+ return next;
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+ private void init() throws SQLException {
+ nextLhsTuple = lhsIterator.next();
+ if (nextLhsTuple != null) {
+ nextLhsKey.evaluate(nextLhsTuple);
+ }
+ advance(true);
+ nextRhsTuple = rhsIterator.next();
+ if (nextRhsTuple != null) {
+ nextRhsKey.evaluate(nextRhsTuple);
+ }
+ advance(false);
+ initialized = true;
+ }
+
+ private void advance(boolean lhs) throws SQLException {
+ if (lhs) {
+ lhsTuple = nextLhsTuple;
+ lhsKey.set(nextLhsKey);
+ if (lhsTuple != null) {
+ nextLhsTuple = lhsIterator.next();
+ if (nextLhsTuple != null) {
+ nextLhsKey.evaluate(nextLhsTuple);
+ } else {
+ nextLhsKey.clear();
+ }
+ }
+ } else {
+ rhsTuple = nextRhsTuple;
+ rhsKey.set(nextRhsKey);
+ if (rhsTuple != null) {
+ nextRhsTuple = rhsIterator.next();
+ if (nextRhsTuple != null) {
+ nextRhsKey.evaluate(nextRhsTuple);
+ } else {
+ nextRhsKey.clear();
+ }
+ }
+ }
+ }
+
+ private boolean isEnd() {
+ return (lhsTuple == null && (rhsTuple == null || type != JoinType.Full))
+ || (queueIterator == null && rhsTuple == null && type == JoinType.Inner);
+ }
+
+ private Tuple join(Tuple lhs, Tuple rhs) throws SQLException {
+ try {
+ ProjectedValueTuple t = null;
+ if (lhs == null) {
+ t = new ProjectedValueTuple(rhs, rhs.getValue(0).getTimestamp(),
+ this.emptyProjectedValue, 0, this.emptyProjectedValue.length,
+ this.emptyProjectedValue.length);
+ } else if (lhs instanceof ProjectedValueTuple) {
+ t = (ProjectedValueTuple) lhs;
+ } else {
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ TupleProjector.decodeProjectedValue(lhs, ptr);
+ lhsBitSet.clear();
+ lhsBitSet.or(ptr);
+ int bitSetLen = lhsBitSet.getEstimatedLength();
+ t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(),
+ ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);
+
+ }
+ return rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
+ t : TupleProjector.mergeProjectedValue(
+ t, joinedSchema, destBitSet,
+ rhs, rhsSchema, rhsBitSet, rhsFieldPosition);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+ }
+
+ private class SemiAntiJoinIterator implements ResultIterator {
+ private final ResultIterator lhsIterator;
+ private final ResultIterator rhsIterator;
+ private final boolean isSemi;
+ private boolean initialized;
+ private Tuple lhsTuple;
+ private Tuple rhsTuple;
+ private JoinKey lhsKey;
+ private JoinKey rhsKey;
+
+ public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
+ if (type != JoinType.Semi && type != JoinType.Anti) throw new IllegalArgumentException("Type " + type + " is not allowed by " + SemiAntiJoinIterator.class.getName());
+ this.lhsIterator = lhsIterator;
+ this.rhsIterator = rhsIterator;
+ this.isSemi = type == JoinType.Semi;
+ this.initialized = false;
+ this.lhsTuple = null;
+ this.rhsTuple = null;
+ this.lhsKey = new JoinKey(lhsKeyExpressions);
+ this.rhsKey = new JoinKey(rhsKeyExpressions);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ lhsIterator.close();
+ rhsIterator.close();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (!initialized) {
+ advance(true);
+ advance(false);
+ initialized = true;
+ }
+
+ Tuple next = null;
+ while (lhsTuple != null && next == null) {
+ if (rhsTuple != null) {
+ if (lhsKey.equals(rhsKey)) {
+ if (isSemi) {
+ next = lhsTuple;
+ }
+ advance(true);
+ } else if (lhsKey.compareTo(rhsKey) < 0) {
+ if (!isSemi) {
+ next = lhsTuple;
+ }
+ advance(true);
+ } else {
+ advance(false);
+ }
+ } else {
+ if (!isSemi) {
+ next = lhsTuple;
+ }
+ advance(true);
+ }
+ }
+
+ return next;
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+ private void advance(boolean lhs) throws SQLException {
+ if (lhs) {
+ lhsTuple = lhsIterator.next();
+ if (lhsTuple != null) {
+ lhsKey.evaluate(lhsTuple);
+ } else {
+ lhsKey.clear();
+ }
+ } else {
+ rhsTuple = rhsIterator.next();
+ if (rhsTuple != null) {
+ rhsKey.evaluate(rhsTuple);
+ } else {
+ rhsKey.clear();
+ }
+ }
+ }
+ }
+
+ private static class JoinKey implements Comparable<JoinKey> {
+ private final List<Expression> expressions;
+ private final List<ImmutableBytesWritable> keys;
+
+ public JoinKey(List<Expression> expressions) {
+ this.expressions = expressions;
+ this.keys = Lists.newArrayListWithExpectedSize(expressions.size());
+ for (int i = 0; i < expressions.size(); i++) {
+ this.keys.add(new ImmutableBytesWritable());
+ }
+ }
+
+ public void evaluate(Tuple tuple) {
+ for (int i = 0; i < keys.size(); i++) {
+ if (!expressions.get(i).evaluate(tuple, keys.get(i))) {
+ keys.get(i).set(EMPTY_PTR);
+ }
+ }
+ }
+
+ public void set(JoinKey other) {
+ for (int i = 0; i < keys.size(); i++) {
+ ImmutableBytesWritable key = other.keys.get(i);
+ this.keys.get(i).set(key.get(), key.getOffset(), key.getLength());
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < keys.size(); i++) {
+ this.keys.get(i).set(EMPTY_PTR);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof JoinKey))
+ return false;
+ return this.compareTo((JoinKey) other) == 0;
+ }
+
+ @Override
+ public int compareTo(JoinKey other) {
+ for (int i = 0; i < keys.size(); i++) {
+ int comp = this.keys.get(i).compareTo(other.keys.get(i));
+ if (comp != 0)
+ return comp;
+ }
+
+ return 0;
+ }
+ }
+
+ private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> {
+
+ public MappedByteBufferTupleQueue(int thresholdBytes) {
+ super(thresholdBytes);
+ }
+
+ @Override
+ protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue(
+ int index, int thresholdBytes) {
+ return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false);
+ }
+
+ @Override
+ protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
+ return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() {
+ @Override
+ public int compare(MappedByteBufferSegmentQueue<Tuple> q1,
+ MappedByteBufferSegmentQueue<Tuple> q2) {
+ return q1.index() - q2.index();
+ }
+ };
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+ private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter;
+ private Iterator<Tuple> currentIter;
+ {
+ this.queueIter = getSegmentQueues().iterator();
+ this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIter != null && currentIter.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ if (!hasNext())
+ return null;
+
+ Tuple ret = currentIter.next();
+ if (!currentIter.hasNext()) {
+ this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> {
+ private LinkedList<Tuple> results;
+
+ public MappedByteBufferTupleSegmentQueue(int index,
+ int thresholdBytes, boolean hasMaxQueueSize) {
+ super(index, thresholdBytes, hasMaxQueueSize);
+ this.results = Lists.newLinkedList();
+ }
+
+ @Override
+ protected Queue<Tuple> getInMemoryQueue() {
+ return results;
+ }
+
+ @Override
+ protected int sizeOf(Tuple e) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+ return Bytes.SIZEOF_INT * 2 + kv.getLength();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+ buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
+ buffer.putInt(kv.getLength());
+ buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
+ }
+
+ @Override
+ protected Tuple readFromBuffer(MappedByteBuffer buffer) {
+ int length = buffer.getInt();
+ if (length < 0)
+ return null;
+
+ byte[] b = new byte[length];
+ buffer.get(b);
+ Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
+ return new ResultTuple(result);
+ }
+
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a715a796/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
new file mode 100644
index 0000000..8ada952
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.AbstractQueue;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MinMaxPriorityQueue;
+
+public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
+ private final int thresholdBytes;
+ private List<MappedByteBufferSegmentQueue<T>> queues;
+ private int currentIndex;
+ private MappedByteBufferSegmentQueue<T> currentQueue;
+ private MinMaxPriorityQueue<MappedByteBufferSegmentQueue<T>> mergedQueue;
+
+ public MappedByteBufferQueue(int thresholdBytes) {
+ this.thresholdBytes = thresholdBytes;
+ this.queues = Lists.<MappedByteBufferSegmentQueue<T>> newArrayList();
+ this.currentIndex = -1;
+ this.currentQueue = null;
+ this.mergedQueue = null;
+ }
+
+ abstract protected MappedByteBufferSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
+
+ abstract protected Comparator<MappedByteBufferSegmentQueue<T>> getSegmentQueueComparator();
+
+ protected final List<MappedByteBufferSegmentQueue<T>> getSegmentQueues() {
+ return queues.subList(0, currentIndex + 1);
+ }
+
+ @Override
+ public boolean offer(T e) {
+ boolean startNewQueue = this.currentQueue == null || this.currentQueue.isFlushed();
+ if (startNewQueue) {
+ currentIndex++;
+ if (currentIndex < queues.size()) {
+ currentQueue = queues.get(currentIndex);
+ } else {
+ currentQueue = createSegmentQueue(currentIndex, thresholdBytes);
+ queues.add(currentQueue);
+ }
+ }
+
+ return this.currentQueue.offer(e);
+ }
+
+ @Override
+ public T poll() {
+ initMergedQueue();
+ if (mergedQueue != null && !mergedQueue.isEmpty()) {
+ MappedByteBufferSegmentQueue<T> queue = mergedQueue.poll();
+ T re = queue.poll();
+ if (queue.peek() != null) {
+ mergedQueue.add(queue);
+ }
+ return re;
+ }
+ return null;
+ }
+
+ @Override
+ public T peek() {
+ initMergedQueue();
+ if (mergedQueue != null && !mergedQueue.isEmpty()) {
+ return mergedQueue.peek().peek();
+ }
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ queue.clear();
+ }
+ currentIndex = -1;
+ currentQueue = null;
+ mergedQueue = null;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ size += queue.size();
+ }
+ return size;
+ }
+
+ public long getByteSize() {
+ return currentQueue == null ? 0 : currentQueue.getInMemByteSize();
+ }
+
+ public void close() {
+ for (MappedByteBufferSegmentQueue<T> queue : queues) {
+ queue.close();
+ }
+ queues.clear();
+ }
+
+ private void initMergedQueue() {
+ if (mergedQueue == null && currentIndex >= 0) {
+ mergedQueue = MinMaxPriorityQueue.<MappedByteBufferSegmentQueue<T>> orderedBy(
+ getSegmentQueueComparator()).maximumSize(currentIndex + 1).create();
+ for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ T re = queue.peek();
+ if (re != null) {
+ mergedQueue.add(queue);
+ }
+ }
+ }
+ }
+
+ public abstract static class MappedByteBufferSegmentQueue<T> extends AbstractQueue<T> {
+ protected static final int EOF = -1;
+ // at least create 128 KB MappedByteBuffers
+ private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
+
+ private final int index;
+ private final int thresholdBytes;
+ private final boolean hasMaxQueueSize;
+ private long totalResultSize = 0;
+ private int maxResultSize = 0;
+ private long mappingSize = 0;
+ private File file;
+ private boolean isClosed = false;
+ private boolean flushBuffer = false;
+ private int flushedCount = 0;
+ private T current = null;
+ private SegmentQueueFileIterator thisIterator;
+ // iterators to close on close()
+ private List<SegmentQueueFileIterator> iterators;
+
+ public MappedByteBufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
+ this.index = index;
+ this.thresholdBytes = thresholdBytes;
+ this.hasMaxQueueSize = hasMaxQueueSize;
+ this.iterators = Lists.<SegmentQueueFileIterator> newArrayList();
+ }
+
+ abstract protected Queue<T> getInMemoryQueue();
+ abstract protected int sizeOf(T e);
+ abstract protected void writeToBuffer(MappedByteBuffer buffer, T e);
+ abstract protected T readFromBuffer(MappedByteBuffer buffer);
+
+ public int index() {
+ return this.index;
+ }
+
+ public int size() {
+ if (flushBuffer)
+ return flushedCount;
+ return getInMemoryQueue().size();
+ }
+
+ public long getInMemByteSize() {
+ if (flushBuffer)
+ return 0;
+ return totalResultSize;
+ }
+
+ public boolean isFlushed() {
+ return flushBuffer;
+ }
+
+ @Override
+ public boolean offer(T e) {
+ if (isClosed || flushBuffer)
+ return false;
+
+ boolean added = getInMemoryQueue().add(e);
+ if (added) {
+ try {
+ flush(e);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return added;
+ }
+
+ @Override
+ public T peek() {
+ if (current == null && !isClosed) {
+ current = next();
+ }
+
+ return current;
+ }
+
+ @Override
+ public T poll() {
+ T ret = peek();
+ if (!isClosed) {
+ current = next();
+ } else {
+ current = null;
+ }
+
+ return ret;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if (isClosed)
+ return null;
+
+ if (!flushBuffer)
+ return getInMemoryQueue().iterator();
+
+ SegmentQueueFileIterator iterator = new SegmentQueueFileIterator(thisIterator);
+ iterators.add(iterator);
+ return iterator;
+ }
+
+ @Override
+ public void clear() {
+ getInMemoryQueue().clear();
+ this.totalResultSize = 0;
+ this.maxResultSize = 0;
+ this.mappingSize = 0;
+ this.flushBuffer = false;
+ this.flushedCount = 0;
+ this.current = null;
+ if (thisIterator != null) {
+ thisIterator.close();
+ thisIterator = null;
+ }
+ for (SegmentQueueFileIterator iter : iterators) {
+ iter.close();
+ }
+ iterators.clear();
+ if (this.file != null) {
+ file.delete();
+ file = null;
+ }
+ }
+
+ public void close() {
+ if (!isClosed) {
+ clear();
+ this.isClosed = true;
+ }
+ }
+
+ private T next() {
+ T ret = null;
+ if (!flushBuffer) {
+ ret = getInMemoryQueue().poll();
+ } else {
+ if (thisIterator == null) {
+ thisIterator = new SegmentQueueFileIterator();
+ }
+ ret = thisIterator.next();
+ }
+
+ if (ret == null) {
+ close();
+ }
+
+ return ret;
+ }
+
+ private void flush(T entry) throws IOException {
+ Queue<T> inMemQueue = getInMemoryQueue();
+ int resultSize = sizeOf(entry);
+ maxResultSize = Math.max(maxResultSize, resultSize);
+ totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
+ if (totalResultSize >= thresholdBytes) {
+ this.file = File.createTempFile(UUID.randomUUID().toString(), null);
+ RandomAccessFile af = new RandomAccessFile(file, "rw");
+ FileChannel fc = af.getChannel();
+ int writeIndex = 0;
+ mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
+ MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+
+ int resSize = inMemQueue.size();
+ for (int i = 0; i < resSize; i++) {
+ T e = inMemQueue.poll();
+ writeToBuffer(writeBuffer, e);
+ // buffer close to exhausted, re-map.
+ if (mappingSize - writeBuffer.position() < maxResultSize) {
+ writeIndex += writeBuffer.position();
+ writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+ }
+ }
+ writeBuffer.putInt(EOF); // end
+ fc.force(true);
+ fc.close();
+ af.close();
+ flushedCount = resSize;
+ inMemQueue.clear();
+ flushBuffer = true;
+ }
+ }
+
+ private class SegmentQueueFileIterator implements Iterator<T>, Closeable {
+ private boolean isEnd;
+ private long readIndex;
+ private RandomAccessFile af;
+ private FileChannel fc;
+ private MappedByteBuffer readBuffer;
+ private T next;
+
+ public SegmentQueueFileIterator() {
+ init(0);
+ }
+
+ public SegmentQueueFileIterator(SegmentQueueFileIterator iterator) {
+ if (iterator != null && iterator.isEnd) {
+ this.isEnd = true;
+ } else {
+ init(iterator == null ? 0 : iterator.readIndex);
+ }
+ }
+
+ private void init(long readIndex) {
+ this.isEnd = false;
+ this.readIndex = readIndex;
+ this.next = null;
+ try {
+ this.af = new RandomAccessFile(file, "r");
+ this.fc = af.getChannel();
+ this.readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!isEnd && next == null) {
+ next = readNext();
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext())
+ return null;
+
+ T ret = next;
+ next = readNext();
+ return ret;
+ }
+
+ private T readNext() {
+ if (isEnd)
+ return null;
+
+ T e = readFromBuffer(readBuffer);
+ if (e == null) {
+ close();
+ return null;
+ }
+
+ // buffer close to exhausted, re-map.
+ if (mappingSize - readBuffer.position() < maxResultSize) {
+ readIndex += readBuffer.position();
+ try {
+ readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return e;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ this.isEnd = true;
+ if (this.fc != null) {
+ try {
+ this.fc.close();
+ } catch (IOException ignored) {
+ }
+ }
+ if (this.af != null) {
+ try {
+ this.af.close();
+ } catch (IOException ignored) {
+ }
+ this.af = null;
+ }
+ }
+ }
+ }
+}
+
[3/5] phoenix git commit: PHOENIX-1799 Support many-to-many joins
Posted by ma...@apache.org.
PHOENIX-1799 Support many-to-many joins
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a715a796
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a715a796
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a715a796
Branch: refs/heads/master
Commit: a715a796e4962a452ff321ea2252643b06d9ce01
Parents: 9b285da
Author: maryannxue <ma...@apache.org>
Authored: Fri Nov 14 15:27:40 2014 -0500
Committer: maryannxue <ma...@apache.org>
Committed: Fri Nov 14 15:27:40 2014 -0500
----------------------------------------------------------------------
.../apache/phoenix/end2end/SortMergeJoinIT.java | 2822 ++++++++++++++++++
.../phoenix/execute/SortMergeJoinPlan.java | 628 ++++
.../phoenix/iterate/MappedByteBufferQueue.java | 431 +++
3 files changed, 3881 insertions(+)
----------------------------------------------------------------------
[5/5] phoenix git commit: PHOENIX-1799 Support many-to-many joins
Posted by ma...@apache.org.
PHOENIX-1799 Support many-to-many joins
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eddc846d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eddc846d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eddc846d
Branch: refs/heads/master
Commit: eddc846d85f97f3cc2206aba27a8d544c4db3384
Parents: a715a79
Author: maryannxue <ma...@apache.org>
Authored: Fri Nov 14 15:29:43 2014 -0500
Committer: maryannxue <ma...@apache.org>
Committed: Fri Nov 14 15:29:43 2014 -0500
----------------------------------------------------------------------
.../apache/phoenix/compile/GroupByCompiler.java | 4 +-
.../apache/phoenix/compile/JoinCompiler.java | 147 ++++----
.../apache/phoenix/compile/OrderByCompiler.java | 11 +-
.../apache/phoenix/compile/QueryCompiler.java | 191 +++++++---
.../phoenix/compile/SubselectRewriter.java | 23 +-
.../apache/phoenix/compile/UpsertCompiler.java | 5 +
.../apache/phoenix/compile/WhereCompiler.java | 4 +-
.../coprocessor/HashJoinRegionScanner.java | 17 +-
.../apache/phoenix/execute/TupleProjector.java | 59 +--
.../phoenix/iterate/FilterResultIterator.java | 3 +-
.../iterate/MappedByteBufferSortedQueue.java | 370 ++++---------------
.../java/org/apache/phoenix/parse/HintNode.java | 6 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 11 +
.../phoenix/parse/SelectStatementRewriter.java | 2 +-
.../org/apache/phoenix/schema/ValueBitSet.java | 11 +-
15 files changed, 406 insertions(+), 458 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index a561a47..016cd52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -136,7 +136,7 @@ public class GroupByCompiler {
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
- public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector) throws SQLException {
+ public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException {
List<ParseNode> groupByNodes = statement.getGroupBy();
/**
* Distinct can use an aggregate plan if there's no group by.
@@ -179,7 +179,7 @@ public class GroupByCompiler {
return GroupBy.EMPTY_GROUP_BY;
}
- boolean isRowKeyOrderedGrouping = groupByVisitor.isOrderPreserving();
+ boolean isRowKeyOrderedGrouping = isInRowKeyOrder && groupByVisitor.isOrderPreserving();
List<Expression> expressions = Lists.newArrayListWithCapacity(groupByEntries.size());
List<Expression> keyExpressions = expressions;
String groupExprAttribName;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b519dc4..45b6603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.compile;
import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -293,6 +292,10 @@ public class JoinCompiler {
return columnRefs;
}
+ public ParseNode getPostFiltersCombined() {
+ return combine(postFilters);
+ }
+
public void addFilter(ParseNode filter) throws SQLException {
if (joinSpecs.isEmpty()) {
table.addFilter(filter);
@@ -320,7 +323,7 @@ public class JoinCompiler {
for (JoinSpec joinSpec : joinSpecs) {
JoinTable joinTable = joinSpec.getJoinTable();
boolean hasSubJoin = !joinTable.getJoinSpecs().isEmpty();
- for (ComparisonParseNode node : joinSpec.getOnConditions()) {
+ for (EqualParseNode node : joinSpec.getOnConditions()) {
node.getLHS().accept(generalRefVisitor);
if (hasSubJoin) {
node.getRHS().accept(generalRefVisitor);
@@ -384,13 +387,12 @@ public class JoinCompiler {
}
public SelectStatement getAsSingleSubquery(SelectStatement query, boolean asSubquery) throws SQLException {
- if (!isFlat(query))
- throw new SQLFeatureNotSupportedException("Complex subqueries not supported as left join table.");
+ assert (isFlat(query));
if (asSubquery)
return query;
- return NODE_FACTORY.select(query.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), query.getWhere(), select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence());
+ return NODE_FACTORY.select(select, query.getFrom(), query.getWhere());
}
public boolean hasPostReference() {
@@ -427,7 +429,7 @@ public class JoinCompiler {
public static class JoinSpec {
private final JoinType type;
- private final List<ComparisonParseNode> onConditions;
+ private final List<EqualParseNode> onConditions;
private final JoinTable joinTable;
private final boolean singleValueOnly;
private Set<TableRef> dependencies;
@@ -436,7 +438,7 @@ public class JoinCompiler {
private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable,
boolean singleValueOnly, ColumnResolver resolver) throws SQLException {
this.type = type;
- this.onConditions = new ArrayList<ComparisonParseNode>();
+ this.onConditions = new ArrayList<EqualParseNode>();
this.joinTable = joinTable;
this.singleValueOnly = singleValueOnly;
this.dependencies = new HashSet<TableRef>();
@@ -454,7 +456,7 @@ public class JoinCompiler {
return type;
}
- public List<ComparisonParseNode> getOnConditions() {
+ public List<EqualParseNode> getOnConditions() {
return onConditions;
}
@@ -470,75 +472,63 @@ public class JoinCompiler {
return dependencies;
}
- public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext context, ColumnResolver leftResolver, ColumnResolver rightResolver) throws SQLException {
+ public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException {
if (onConditions.isEmpty()) {
return new Pair<List<Expression>, List<Expression>>(
Collections.<Expression> singletonList(LiteralExpression.newConstant(1)),
Collections.<Expression> singletonList(LiteralExpression.newConstant(1)));
}
- ColumnResolver resolver = context.getResolver();
- List<Pair<Expression, Expression>> compiled = new ArrayList<Pair<Expression, Expression>>(onConditions.size());
- context.setResolver(leftResolver);
- ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
- for (ParseNode condition : onConditions) {
- assert (condition instanceof EqualParseNode);
- EqualParseNode equalNode = (EqualParseNode) condition;
- expressionCompiler.reset();
- Expression left = equalNode.getLHS().accept(expressionCompiler);
- compiled.add(new Pair<Expression, Expression>(left, null));
- }
- context.setResolver(rightResolver);
- expressionCompiler = new ExpressionCompiler(context);
- Iterator<Pair<Expression, Expression>> iter = compiled.iterator();
- for (ParseNode condition : onConditions) {
- Pair<Expression, Expression> p = iter.next();
- EqualParseNode equalNode = (EqualParseNode) condition;
- expressionCompiler.reset();
- Expression right = equalNode.getRHS().accept(expressionCompiler);
- Expression left = p.getFirst();
+ List<Pair<Expression, Expression>> compiled = Lists.<Pair<Expression, Expression>> newArrayListWithExpectedSize(onConditions.size());
+ ExpressionCompiler lhsCompiler = new ExpressionCompiler(lhsCtx);
+ ExpressionCompiler rhsCompiler = new ExpressionCompiler(rhsCtx);
+ for (EqualParseNode condition : onConditions) {
+ lhsCompiler.reset();
+ Expression left = condition.getLHS().accept(lhsCompiler);
+ rhsCompiler.reset();
+ Expression right = condition.getRHS().accept(rhsCompiler);
PDataType toType = getCommonType(left.getDataType(), right.getDataType());
if (left.getDataType() != toType) {
left = CoerceExpression.create(left, toType);
- p.setFirst(left);
}
if (right.getDataType() != toType) {
right = CoerceExpression.create(right, toType);
}
- p.setSecond(right);
+ compiled.add(new Pair<Expression, Expression>(left, right));
}
- context.setResolver(resolver); // recover the resolver
- Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
- @Override
- public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {
- Expression e1 = o1.getFirst();
- Expression e2 = o2.getFirst();
- boolean isFixed1 = e1.getDataType().isFixedWidth();
- boolean isFixed2 = e2.getDataType().isFixedWidth();
- boolean isFixedNullable1 = e1.isNullable() &&isFixed1;
- boolean isFixedNullable2 = e2.isNullable() && isFixed2;
- if (isFixedNullable1 == isFixedNullable2) {
- if (isFixed1 == isFixed2) {
- return 0;
- } else if (isFixed1) {
- return -1;
- } else {
+ if (sortExpressions) {
+ Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
+ @Override
+ public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {
+ Expression e1 = o1.getFirst();
+ Expression e2 = o2.getFirst();
+ boolean isFixed1 = e1.getDataType().isFixedWidth();
+ boolean isFixed2 = e2.getDataType().isFixedWidth();
+ boolean isFixedNullable1 = e1.isNullable() &&isFixed1;
+ boolean isFixedNullable2 = e2.isNullable() && isFixed2;
+ if (isFixedNullable1 == isFixedNullable2) {
+ if (isFixed1 == isFixed2) {
+ return 0;
+ } else if (isFixed1) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else if (isFixedNullable1) {
return 1;
+ } else {
+ return -1;
}
- } else if (isFixedNullable1) {
- return 1;
- } else {
- return -1;
}
- }
- });
- List<Expression> lConditions = new ArrayList<Expression>(compiled.size());
- List<Expression> rConditions = new ArrayList<Expression>(compiled.size());
+ });
+ }
+ List<Expression> lConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size());
+ List<Expression> rConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size());
for (Pair<Expression, Expression> pair : compiled) {
lConditions.add(pair.getFirst());
rConditions.add(pair.getSecond());
}
-
+
return new Pair<List<Expression>, List<Expression>>(lConditions, rConditions);
}
@@ -683,11 +673,11 @@ public class JoinCompiler {
return JoinCompiler.compilePostFilterExpression(context, postFilters);
}
- public SelectStatement getAsSubquery() throws SQLException {
+ public SelectStatement getAsSubquery(List<OrderByNode> orderBy) throws SQLException {
if (isSubselect())
- return SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias());
+ return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
- return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence());
+ return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence());
}
public boolean hasFilters() {
@@ -912,12 +902,12 @@ public class JoinCompiler {
}
private static class OnNodeVisitor extends BooleanParseNodeVisitor<Void> {
- private List<ComparisonParseNode> onConditions;
+ private List<EqualParseNode> onConditions;
private Set<TableRef> dependencies;
private JoinTable joinTable;
private ColumnRefParseNodeVisitor columnRefVisitor;
- public OnNodeVisitor(ColumnResolver resolver, List<ComparisonParseNode> onConditions,
+ public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions,
Set<TableRef> dependencies, JoinTable joinTable) {
this.onConditions = onConditions;
this.dependencies = dependencies;
@@ -981,7 +971,7 @@ public class JoinCompiler {
joinTable.addFilter(node);
} else if (lhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
&& rhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
- onConditions.add(node);
+ onConditions.add((EqualParseNode) node);
dependencies.addAll(lhsTableRefSet);
} else if (rhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
&& lhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
@@ -1069,9 +1059,9 @@ public class JoinCompiler {
}
}
- private static String PROJECTED_TABLE_SCHEMA = ".";
+ private static final String PROJECTED_TABLE_SCHEMA = ".";
// for creation of new statements
- private static ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
+ private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
private static boolean isFlat(SelectStatement select) {
return !select.isJoin()
@@ -1167,7 +1157,7 @@ public class JoinCompiler {
QueryCompiler compiler = new QueryCompiler(statement, select, resolver);
List<Object> binds = statement.getParameters();
StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
- QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false);
+ QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null);
TableRef table = plan.getTableRef();
if (groupByTableRef != null && !groupByTableRef.equals(table)) {
groupByTableRef = null;
@@ -1303,17 +1293,30 @@ public class JoinCompiler {
return new JoinedTableColumnResolver(this, origResolver);
}
- public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, boolean innerJoin) throws SQLException {
+ public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, JoinType type) throws SQLException {
PTable left = this.getTable();
PTable right = rWrapper.getTable();
- List<PColumn> merged = new ArrayList<PColumn>();
- merged.addAll(left.getColumns());
+ List<PColumn> merged = Lists.<PColumn> newArrayList();
+ if (type != JoinType.Full) {
+ merged.addAll(left.getColumns());
+ } else {
+ for (PColumn c : left.getColumns()) {
+ if (SchemaUtil.isPKColumn(c)) {
+ merged.add(c);
+ } else {
+ PColumnImpl column = new PColumnImpl(c.getName(), c.getFamilyName(), c.getDataType(),
+ c.getMaxLength(), c.getScale(), true, c.getPosition(),
+ c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced());
+ merged.add(column);
+ }
+ }
+ }
int position = merged.size();
for (PColumn c : right.getColumns()) {
if (!SchemaUtil.isPKColumn(c)) {
PColumnImpl column = new PColumnImpl(c.getName(),
PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), c.getDataType(),
- c.getMaxLength(), c.getScale(), innerJoin ? c.isNullable() : true, position++,
+ c.getMaxLength(), c.getScale(), type == JoinType.Inner ? c.isNullable() : true, position++,
c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced());
merged.add(column);
}
@@ -1358,12 +1361,16 @@ public class JoinCompiler {
private JoinedTableColumnResolver(PTableWrapper table, ColumnResolver tableResolver) {
this.table = table;
this.tableResolver = tableResolver;
- this.tableRef = new TableRef(null, table.getTable(), 0, false);
+ this.tableRef = new TableRef(ParseNodeFactory.createTempAlias(), table.getTable(), 0, false);
}
public PTableWrapper getPTableWrapper() {
return table;
}
+
+ public TableRef getTableRef() {
+ return tableRef;
+ }
@Override
public List<TableRef> getTables() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 2629846..444b05e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import com.google.common.collect.ImmutableList;
@@ -77,7 +78,8 @@ public class OrderByCompiler {
*/
public static OrderBy compile(StatementContext context,
FilterableStatement statement,
- GroupBy groupBy, Integer limit) throws SQLException {
+ GroupBy groupBy, Integer limit,
+ boolean isInRowKeyOrder) throws SQLException {
List<OrderByNode> orderByNodes = statement.getOrderBy();
if (orderByNodes.isEmpty()) {
return OrderBy.EMPTY_ORDER_BY;
@@ -115,11 +117,14 @@ public class OrderByCompiler {
return OrderBy.EMPTY_ORDER_BY;
}
// If we're ordering by the order returned by the scan, we don't need an order by
- if (visitor.isOrderPreserving()) {
+ if (isInRowKeyOrder && visitor.isOrderPreserving()) {
if (visitor.isReverse()) {
// Don't use reverse scan if we're using a skip scan, as our skip scan doesn't support this yet.
+ // REV_ROW_KEY_ORDER_BY scan would not take effect for a projected table, so don't return it for such table types.
if (context.getConnection().getQueryServices().getProps().getBoolean(QueryServices.USE_REVERSE_SCAN_ATTRIB, QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN)
- && !context.getScanRanges().useSkipScanFilter()) {
+ && !context.getScanRanges().useSkipScanFilter()
+ && context.getCurrentTable().getTable().getType() != PTableType.JOIN
+ && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) {
return OrderBy.REV_ROW_KEY_ORDER_BY;
}
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 6b767e7..96baafe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -51,12 +52,17 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.EqualParseNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.parse.TableNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -78,6 +84,7 @@ import com.google.common.collect.Sets;
* @since 0.1
*/
public class QueryCompiler {
+ private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
/*
* Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't
* want to introduce a dependency on 0.94.5 (where this feature was
@@ -93,6 +100,7 @@ public class QueryCompiler {
private final List<? extends PDatum> targetColumns;
private final ParallelIteratorFactory parallelIteratorFactory;
private final SequenceManager sequenceManager;
+ private final boolean useSortMergeJoin;
public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement));
@@ -106,6 +114,7 @@ public class QueryCompiler {
this.targetColumns = targetColumns;
this.parallelIteratorFactory = parallelIteratorFactory;
this.sequenceManager = sequenceManager;
+ this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
}
@@ -138,26 +147,56 @@ public class QueryCompiler {
context = new StatementContext(statement, resolver, scan, sequenceManager);
}
JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
- return compileJoinQuery(context, binds, joinTable, false);
+ return compileJoinQuery(context, binds, joinTable, false, false, null);
} else {
return compileSingleQuery(context, select, binds, false, true);
}
}
-
+
+ /*
+ * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
+ * This matches the input JoinTable node against patterns in the following order:
+ * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
+ * Returns the compilation result of a single table scan or of an independent subquery.
+ * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
+ * 1) A LEFT/INNER JOIN B
+ * 2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
+ * where A can be a named table reference or a flat subquery, and B, C, ... can be a named
+ * table reference, a sub-join or a subquery of any kind.
+ * Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
+ * 3. Matching pattern:
+ * A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
+ * where B can be a named table reference or a flat subquery, and A can be a named table
+ * reference, a sub-join or a subquery of any kind.
+ * Returns a HashJoinPlan{scan: B, hash: A}.
+ * NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
+ * "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
+ * parenthesis is considered a sub-join.
+ * viewed as a sub-join.
+ * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
+ * Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
+ * of both sides as order-by clauses.
+ * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
+ *
+ * If no join algorithm hint is provided, according to the above compilation process, a join query
+ * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
+ * TODO 1) Use table statistics to guide the choice of join plans.
+ * 2) Make it possible to hint a certain join algorithm for a specific join step.
+ */
@SuppressWarnings("unchecked")
- protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery) throws SQLException {
+ protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
byte[] emptyByteArray = new byte[0];
List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
if (joinSpecs.isEmpty()) {
Table table = joinTable.getTable();
- SelectStatement subquery = table.getAsSubquery();
+ SelectStatement subquery = table.getAsSubquery(orderBy);
if (!table.isSubselect()) {
- ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery);
+ ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns);
TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
context.setCurrentTable(table.getTableRef());
context.setResolver(projectedTable.createColumnResolver());
table.projectColumns(context.getScan());
- return compileSingleQuery(context, subquery, binds, asSubquery, true);
+ return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery);
}
QueryPlan plan = compileSubquery(subquery);
ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
@@ -165,25 +204,26 @@ public class QueryCompiler {
return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context));
}
- boolean[] starJoinVector = joinTable.getStarJoinVector();
- if (starJoinVector != null) {
+ boolean[] starJoinVector;
+ if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
Table table = joinTable.getTable();
ProjectedPTableWrapper initialProjectedTable;
TableRef tableRef;
SelectStatement query;
if (!table.isSubselect()) {
- initialProjectedTable = table.createProjectedTable(!asSubquery);
+ initialProjectedTable = table.createProjectedTable(!projectPKColumns);
tableRef = table.getTableRef();
table.projectColumns(context.getScan());
- query = joinTable.getAsSingleSubquery(table.getAsSubquery(), asSubquery);
+ query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
} else {
- SelectStatement subquery = table.getAsSubquery();
+ SelectStatement subquery = table.getAsSubquery(orderBy);
QueryPlan plan = compileSubquery(subquery);
initialProjectedTable = table.createProjectedTable(plan.getProjector());
tableRef = plan.getTableRef();
context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
}
+ context.setCurrentTable(tableRef);
PTableWrapper projectedTable = initialProjectedTable;
int count = joinSpecs.size();
ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
@@ -199,13 +239,12 @@ public class QueryCompiler {
JoinSpec joinSpec = joinSpecs.get(i);
Scan subScan = ScanUtil.newScan(originalScan);
StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
- ColumnResolver resolver = subContext.getResolver();
+ QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true, true, null);
boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
if (hasPostReference) {
- PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
+ PTableWrapper subProjTable = ((JoinedTableColumnResolver) subContext.getResolver()).getPTableWrapper();
tables[i] = subProjTable.getTable();
- projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType() == JoinType.Inner);
+ projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType());
needsProject = true;
} else {
tables[i] = null;
@@ -213,13 +252,13 @@ public class QueryCompiler {
if (!starJoinVector[i]) {
needsProject = true;
}
- ColumnResolver leftResolver = (!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver();
+ context.setResolver((!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver());
joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
- Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver);
+ Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true);
joinExpressions[i] = joinConditions.getFirst();
List<Expression> hashExpressions = joinConditions.getSecond();
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+ boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
boolean hasFilters = joinSpec.getJoinTable().hasFilters();
@@ -233,9 +272,8 @@ public class QueryCompiler {
if (needsProject) {
TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
}
- context.setCurrentTable(tableRef);
context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
- QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin());
+ QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin());
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
@@ -247,65 +285,114 @@ public class QueryCompiler {
JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
JoinType type = lastJoinSpec.getType();
- if (type == JoinType.Full)
- throw new SQLFeatureNotSupportedException(type + " joins not supported.");
-
- if (type == JoinType.Right || type == JoinType.Inner) {
- if (!lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty())
- throw new SQLFeatureNotSupportedException("Right join followed by sub-join is not supported.");
-
+ if (!this.useSortMergeJoin
+ && (type == JoinType.Right || type == JoinType.Inner)
+ && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+ && lastJoinSpec.getJoinTable().getTable().isFlat()) {
JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
Table rhsTable = rhsJoinTable.getTable();
JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
Scan subScan = ScanUtil.newScan(originalScan);
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
- ColumnResolver lhsResolver = lhsCtx.getResolver();
- PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+ PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper();
ProjectedPTableWrapper rhsProjTable;
TableRef rhsTableRef;
SelectStatement rhs;
if (!rhsTable.isSubselect()) {
- rhsProjTable = rhsTable.createProjectedTable(!asSubquery);
+ rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns);
rhsTableRef = rhsTable.getTableRef();
rhsTable.projectColumns(context.getScan());
- rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(), asSubquery);
+ rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
} else {
- SelectStatement subquery = rhsTable.getAsSubquery();
+ SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
QueryPlan plan = compileSubquery(subquery);
rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
rhsTableRef = plan.getTableRef();
context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
}
+ context.setCurrentTable(rhsTableRef);
boolean forceProjection = rhsTable.isSubselect();
- ColumnResolver rhsResolver = forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver();
+ context.setResolver(forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver());
ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(context, lhsResolver, rhsResolver);
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
List<Expression> joinExpressions = joinConditions.getSecond();
List<Expression> hashExpressions = joinConditions.getFirst();
- int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size();
- PTableWrapper projectedTable = rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Inner);
- TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
- context.setCurrentTable(rhsTableRef);
- context.setResolver(projectedTable.createColumnResolver());
- QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right);
+ boolean needsMerge = lhsJoin.hasPostReference();
+ boolean needsProject = forceProjection || asSubquery || needsMerge;
+ PTable lhsTable = needsMerge ? lhsProjTable.getTable() : null;
+ int fieldPosition = needsMerge ? rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size() : 0;
+ PTableWrapper projectedTable = needsMerge ? rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+ if (needsProject) {
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
+ }
+ context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
+ QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
limit = LimitCompiler.compile(context, rhs);
}
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
+ getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())});
}
- // Do not support queries like "A right join B left join C" with hash-joins.
- throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
+ JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+ JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+ if (type == JoinType.Right) {
+ JoinTable temp = lhsJoin;
+ lhsJoin = rhsJoin;
+ rhsJoin = temp;
+ }
+
+ List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+ List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
+ List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
+ for (EqualParseNode condition : joinConditionNodes) {
+ lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+ rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+ }
+
+ Scan lhsScan = ScanUtil.newScan(originalScan);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+ boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+ PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper();
+ boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+
+ Scan rhsScan = ScanUtil.newScan(originalScan);
+ StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+ QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+ PTableWrapper rhsProjTable = ((JoinedTableColumnResolver) rhsCtx.getResolver()).getPTableWrapper();
+
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false);
+ List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+ List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+
+ boolean needsMerge = rhsJoin.hasPostReference();
+ PTable rhsTable = needsMerge ? rhsProjTable.getTable() : null;
+ int fieldPosition = needsMerge ? lhsProjTable.getTable().getColumns().size() - lhsProjTable.getTable().getPKColumns().size() : 0;
+ PTableWrapper projectedTable = needsMerge ? lhsProjTable.mergeProjectedTables(rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+ ColumnResolver resolver = projectedTable.createColumnResolver();
+ TableRef tableRef = ((JoinedTableColumnResolver) resolver).getTableRef();
+ StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+ subCtx.setCurrentTable(tableRef);
+ QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable.getTable(), lhsProjTable.getTable(), rhsTable, fieldPosition);
+ context.setCurrentTable(tableRef);
+ context.setResolver(resolver);
+ TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+ ParseNode where = joinTable.getPostFiltersCombined();
+ SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence())
+ : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+
+ return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
}
- private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
+ private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
if (type != JoinType.Inner && type != JoinType.Semi)
return false;
@@ -313,7 +400,7 @@ public class QueryCompiler {
StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement));
contextCopy.setCurrentTable(table);
List<Expression> lhsCombination = Lists.<Expression> newArrayList();
- boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, this.select, joinExpressions);
+ boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, select, joinExpressions);
if (lhsCombination.isEmpty())
return false;
@@ -355,7 +442,7 @@ public class QueryCompiler {
protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
SelectStatement innerSelect = select.getInnerSelectStatement();
if (innerSelect == null) {
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
}
QueryPlan innerPlan = compileSubquery(innerSelect);
@@ -369,10 +456,10 @@ public class QueryCompiler {
tableRef = resolver.getTables().get(0);
context.setCurrentTable(tableRef);
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, innerPlan.getOrderBy().getOrderByExpressions().isEmpty());
}
- protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{
+ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
PhoenixConnection connection = statement.getConnection();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
@@ -384,7 +471,7 @@ public class QueryCompiler {
}
Integer limit = LimitCompiler.compile(context, select);
- GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector);
+ GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector, isInRowKeyOrder);
// Optimize the HAVING clause by finding any group by expressions that can be moved
// to the WHERE clause
select = HavingCompiler.rewrite(context, select, groupBy);
@@ -397,7 +484,7 @@ public class QueryCompiler {
Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
context.setResolver(resolver); // recover resolver
- OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit);
+ OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, isInRowKeyOrder);
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
// Final step is to build the query plan
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index d229478..805894f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -58,6 +58,13 @@ public class SubselectRewriter extends ParseNodeRewriter {
return statement.getLimit() == null && (!statement.isAggregate() || !statement.getGroupBy().isEmpty());
}
+ public static SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy, String subqueryAlias) throws SQLException {
+ if (orderBy == null)
+ return statement;
+
+ return new SubselectRewriter(null, statement.getSelect(), subqueryAlias).applyOrderBy(statement, orderBy);
+ }
+
public static SelectStatement flatten(SelectStatement select, PhoenixConnection connection) throws SQLException {
TableNode from = select.getFrom();
while (from != null && from instanceof DerivedTableNode) {
@@ -209,16 +216,24 @@ public class SubselectRewriter extends ParseNodeRewriter {
if (where != null) {
postFiltersRewrite.add(where);
}
- return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), combine(postFiltersRewrite), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+ return NODE_FACTORY.select(statement, combine(postFiltersRewrite));
}
ParseNode having = statement.getHaving();
if (having != null) {
postFiltersRewrite.add(having);
}
- return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), combine(postFiltersRewrite), statement.getOrderBy(), statement.getLimit(),
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+ return NODE_FACTORY.select(statement, statement.getWhere(), combine(postFiltersRewrite));
+ }
+
+ private SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy) throws SQLException {
+ List<OrderByNode> orderByRewrite = Lists.<OrderByNode> newArrayListWithExpectedSize(orderBy.size());
+ for (OrderByNode orderByNode : orderBy) {
+ ParseNode node = orderByNode.getNode();
+ orderByRewrite.add(NODE_FACTORY.orderBy(node.accept(this), orderByNode.isNullsLast(), orderByNode.isAscending()));
+ }
+
+ return NODE_FACTORY.select(statement, orderByRewrite);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 0be40b8..796f368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -419,6 +419,11 @@ public class UpsertCompiler {
try {
QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
queryPlanToBe = compiler.compile();
+ // This is post-fix: if the tableRef is a projected table, this means there are post-processing
+ // steps and parallelIteratorFactory did not take effect.
+ if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.JOIN || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) {
+ parallelIteratorFactoryToBe = null;
+ }
} catch (MetaDataEntityNotFoundException e) {
retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
throw e;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 2c49fed..471ee37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -140,7 +140,9 @@ public class WhereCompiler {
expression = AndExpression.create(filters);
}
- expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
+ if (context.getCurrentTable().getTable().getType() != PTableType.JOIN && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) {
+ expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
+ }
setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization);
return expression;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 724122d..176520e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -154,7 +154,7 @@ public class HashJoinRegionScanner implements RegionScanner {
for (int i = 0; i < count; i++) {
boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
JoinType type = joinInfo.getJoinTypes()[i];
- if (earlyEvaluation && (tempTuples[i] == null || type == JoinType.Semi))
+ if (earlyEvaluation && (type == JoinType.Semi || type == JoinType.Anti))
continue;
int j = resultQueue.size();
while (j-- > 0) {
@@ -163,12 +163,23 @@ public class HashJoinRegionScanner implements RegionScanner {
ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]);
tempTuples[i] = hashCaches[i].get(key);
if (tempTuples[i] == null) {
- if (type != JoinType.Inner && type != JoinType.Semi) {
+ if (type == JoinType.Inner || type == JoinType.Semi) {
+ continue;
+ } else if (type == JoinType.Anti) {
resultQueue.offer(lhs);
+ continue;
}
- continue;
}
}
+ if (tempTuples[i] == null) {
+ Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
+ lhs : TupleProjector.mergeProjectedValue(
+ (ProjectedValueTuple) lhs, schema, tempDestBitSet,
+ null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
+ joinInfo.getFieldPositions()[i]);
+ resultQueue.offer(joined);
+ continue;
+ }
for (Tuple t : tempTuples[i]) {
Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
lhs : TupleProjector.mergeProjectedValue(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index 346a9fd..77682e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -152,14 +152,21 @@ public class TupleProjector {
public static class ProjectedValueTuple extends BaseTuple {
private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
private long timestamp;
- private byte[] projectedValue;
+ private ImmutableBytesWritable projectedValue = new ImmutableBytesWritable();
private int bitSetLen;
private KeyValue keyValue;
- private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+ public ProjectedValueTuple(Tuple keyBase, long timestamp, byte[] projectedValue, int valueOffset, int valueLength, int bitSetLen) {
+ keyBase.getKey(this.keyPtr);
+ this.timestamp = timestamp;
+ this.projectedValue.set(projectedValue, valueOffset, valueLength);
+ this.bitSetLen = bitSetLen;
+ }
+
+ public ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int valueOffset, int valueLength, int bitSetLen) {
this.keyPtr.set(keyBuffer, keyOffset, keyLength);
this.timestamp = timestamp;
- this.projectedValue = projectedValue;
+ this.projectedValue.set(projectedValue, valueOffset, valueLength);
this.bitSetLen = bitSetLen;
}
@@ -171,7 +178,7 @@ public class TupleProjector {
return timestamp;
}
- public byte[] getProjectedValue() {
+ public ImmutableBytesWritable getProjectedValue() {
return projectedValue;
}
@@ -196,7 +203,7 @@ public class TupleProjector {
public KeyValue getValue(byte[] family, byte[] qualifier) {
if (keyValue == null) {
keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
- VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+ VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue.get(), projectedValue.getOffset(), projectedValue.getLength());
}
return keyValue;
}
@@ -204,7 +211,7 @@ public class TupleProjector {
@Override
public boolean getValue(byte[] family, byte[] qualifier,
ImmutableBytesWritable ptr) {
- ptr.set(projectedValue);
+ ptr.set(projectedValue.get(), projectedValue.getOffset(), projectedValue.getLength());
return true;
}
@@ -222,7 +229,7 @@ public class TupleProjector {
public ProjectedValueTuple projectResults(Tuple tuple) {
byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
Cell base = tuple.getValue(0);
- return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+ return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, 0, bytesValue.length, valueSet.getEstimatedLength());
}
public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
@@ -233,27 +240,33 @@ public class TupleProjector {
public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
- ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+ ImmutableBytesWritable destValue = dest.getProjectedValue();
+ int origDestBitSetLen = dest.getBitSetLength();
destBitSet.clear();
- destBitSet.or(destValue);
- int origDestBitSetLen = dest.getBitSetLength();
- ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
- decodeProjectedValue(src, srcValue);
- srcBitSet.clear();
- srcBitSet.or(srcValue);
- int origSrcBitSetLen = srcBitSet.getEstimatedLength();
- for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
- if (srcBitSet.get(i)) {
- destBitSet.set(offset + i);
- }
+ destBitSet.or(destValue, origDestBitSetLen);
+ ImmutableBytesWritable srcValue = null;
+ int srcValueLen = 0;
+ if (src != null) {
+ srcValue = new ImmutableBytesWritable();
+ decodeProjectedValue(src, srcValue);
+ srcBitSet.clear();
+ srcBitSet.or(srcValue);
+ int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+ for (int i = 0; i <= srcBitSet.getMaxSetBit(); i++) {
+ if (srcBitSet.get(i)) {
+ destBitSet.set(offset + i);
+ }
+ }
+ srcValueLen = srcValue.getLength() - origSrcBitSetLen;
}
int destBitSetLen = destBitSet.getEstimatedLength();
- byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+ byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValueLen + destBitSetLen];
int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
- o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+ if (src != null) {
+ o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValueLen);
+ }
destBitSet.toBytes(merged, o);
- ImmutableBytesWritable keyPtr = dest.getKeyPtr();
- return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+ return new ProjectedValueTuple(dest, dest.getTimestamp(), merged, 0, merged.length, destBitSetLen);
}
public KeyValueSchema getSchema() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
index 0c68a20..6c332fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
@@ -59,7 +59,8 @@ public class FilterResultIterator extends LookAheadResultIterator {
Tuple next;
do {
next = delegate.next();
- } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+ expression.reset();
+ } while (next != null && (!expression.evaluate(next, ptr) || Boolean.FALSE.equals(expression.getDataType().toObject(ptr))));
return next;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
index 85c91a1..03a360e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -17,19 +17,12 @@
*/
package org.apache.phoenix.iterate;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
-
+import java.util.Queue;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -41,182 +34,104 @@ import org.apache.phoenix.util.ResultUtil;
import com.google.common.collect.MinMaxPriorityQueue;
-public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
+public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEntry> {
private Comparator<ResultEntry> comparator;
private final int limit;
- private final int thresholdBytes;
- private List<MappedByteBufferPriorityQueue> queues = new ArrayList<MappedByteBufferPriorityQueue>();
- private MappedByteBufferPriorityQueue currentQueue = null;
- private int currentIndex = 0;
- MinMaxPriorityQueue<IndexedResultEntry> mergedQueue = null;
public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
Integer limit, int thresholdBytes) throws IOException {
+ super(thresholdBytes);
this.comparator = comparator;
this.limit = limit == null ? -1 : limit;
- this.thresholdBytes = thresholdBytes;
- this.currentQueue = new MappedByteBufferPriorityQueue(0,
- this.limit, thresholdBytes, comparator);
- this.queues.add(currentQueue);
- }
-
- @Override
- public boolean offer(ResultEntry e) {
- try {
- boolean isFlush = this.currentQueue.writeResult(e);
- if (isFlush) {
- currentIndex++;
- currentQueue = new MappedByteBufferPriorityQueue(currentIndex,
- limit, thresholdBytes, comparator);
- queues.add(currentQueue);
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return true;
- }
-
- @Override
- public ResultEntry poll() {
- if (mergedQueue == null) {
- mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
- comparator).maximumSize(queues.size()).create();
- for (MappedByteBufferPriorityQueue queue : queues) {
- try {
- IndexedResultEntry next = queue.getNextResult();
- if (next != null) {
- mergedQueue.add(next);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- if (!mergedQueue.isEmpty()) {
- IndexedResultEntry re = mergedQueue.pollFirst();
- if (re != null) {
- IndexedResultEntry next = null;
- try {
- next = queues.get(re.getIndex()).getNextResult();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (next != null) {
- mergedQueue.add(next);
- }
- return re;
- }
- }
- return null;
}
@Override
- public ResultEntry peek() {
- if (mergedQueue == null) {
- mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
- comparator).maximumSize(queues.size()).create();
- for (MappedByteBufferPriorityQueue queue : queues) {
- try {
- IndexedResultEntry next = queue.getNextResult();
- if (next != null) {
- mergedQueue.add(next);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- if (!mergedQueue.isEmpty()) {
- IndexedResultEntry re = mergedQueue.peekFirst();
- if (re != null) {
- return re;
- }
- }
- return null;
+ protected org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry> createSegmentQueue(
+ int index, int thresholdBytes) {
+ return new MappedByteBufferResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
}
@Override
- public Iterator<ResultEntry> iterator() {
- throw new UnsupportedOperationException();
+ protected Comparator<org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
+ return new Comparator<MappedByteBufferSegmentQueue<ResultEntry>>() {
+ @Override
+ public int compare(MappedByteBufferSegmentQueue<ResultEntry> q1,
+ MappedByteBufferSegmentQueue<ResultEntry> q2) {
+ return comparator.compare(q1.peek(), q2.peek());
+ }};
}
- @Override
- public int size() {
- int size = 0;
- for (MappedByteBufferPriorityQueue queue : queues) {
- size += queue.size();
+ private static class MappedByteBufferResultEntryPriorityQueue extends MappedByteBufferSegmentQueue<ResultEntry> {
+ private MinMaxPriorityQueue<ResultEntry> results = null;
+
+ public MappedByteBufferResultEntryPriorityQueue(int index,
+ int thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
+ super(index, thresholdBytes, limit >= 0);
+ this.results = limit < 0 ?
+ MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+ : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
}
- return size;
- }
-
- public long getByteSize() {
- return currentQueue.getInMemByteSize();
- }
- public void close() {
- if (queues != null) {
- for (MappedByteBufferPriorityQueue queue : queues) {
- queue.close();
- }
+ @Override
+ protected Queue<ResultEntry> getInMemoryQueue() {
+ return results;
}
- }
-
- private static class IndexedResultEntry extends ResultEntry {
- private int index;
- public IndexedResultEntry(int index, ResultEntry resultEntry) {
- super(resultEntry.sortKeys, resultEntry.result);
- this.index = index;
+ @Override
+ protected int sizeOf(ResultEntry e) {
+ return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
}
- public int getIndex() {
- return this.index;
+ @Override
+ protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) {
+ int totalLen = 0;
+ List<KeyValue> keyValues = toKeyValues(e);
+ for (KeyValue kv : keyValues) {
+ totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
+ }
+ buffer.putInt(totalLen);
+ for (KeyValue kv : keyValues) {
+ buffer.putInt(kv.getLength());
+ buffer.put(kv.getBuffer(), kv.getOffset(), kv
+ .getLength());
+ }
+ ImmutableBytesWritable[] sortKeys = e.sortKeys;
+ buffer.putInt(sortKeys.length);
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ buffer.putInt(sortKey.getLength());
+ buffer.put(sortKey.get(), sortKey.getOffset(),
+ sortKey.getLength());
+ } else {
+ buffer.putInt(0);
+ }
+ }
}
- }
- private static class MappedByteBufferPriorityQueue {
- // at least create 128 KB MappedByteBuffers
- private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
-
- private final int limit;
- private final int thresholdBytes;
- private long totalResultSize = 0;
- private int maxResultSize = 0;
- private long mappingSize = 0;
- private long writeIndex = 0;
- private long readIndex = 0;
- private MappedByteBuffer writeBuffer;
- private MappedByteBuffer readBuffer;
- private FileChannel fc;
- private RandomAccessFile af;
- private File file;
- private boolean isClosed = false;
- MinMaxPriorityQueue<ResultEntry> results = null;
- private boolean flushBuffer = false;
- private int index;
- private int flushedCount;
-
- public MappedByteBufferPriorityQueue(int index, int limit, int thresholdBytes,
- Comparator<ResultEntry> comparator) throws IOException {
- this.index = index;
- this.limit = limit;
- this.thresholdBytes = thresholdBytes;
- results = limit < 0 ?
- MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
- : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
- }
-
- public int size() {
- if (flushBuffer)
- return flushedCount;
- return results.size();
- }
-
- public long getInMemByteSize() {
- if (flushBuffer)
- return 0;
- return totalResultSize;
+ @Override
+ protected ResultEntry readFromBuffer(MappedByteBuffer buffer) {
+ int length = buffer.getInt();
+ if (length < 0)
+ return null;
+
+ byte[] rb = new byte[length];
+ buffer.get(rb);
+ Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
+ ResultTuple rt = new ResultTuple(result);
+ int sortKeySize = buffer.getInt();
+ ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
+ for (int i = 0; i < sortKeySize; i++) {
+ int contentLength = buffer.getInt();
+ if (contentLength > 0) {
+ byte[] sortKeyContent = new byte[contentLength];
+ buffer.get(sortKeyContent);
+ sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
+ } else {
+ sortKeys[i] = null;
+ }
+ }
+
+ return new ResultEntry(sortKeys, rt);
}
private List<KeyValue> toKeyValues(ResultEntry entry) {
@@ -252,138 +167,5 @@ public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
}
return size;
}
-
- public boolean writeResult(ResultEntry entry) throws IOException {
- if (flushBuffer)
- throw new IOException("Results already flushed");
-
- int sortKeySize = sizeof(entry.sortKeys);
- int resultSize = sizeof(toKeyValues(entry)) + sortKeySize;
- boolean added = results.add(entry);
- if (added) {
- maxResultSize = Math.max(maxResultSize, resultSize);
- totalResultSize = limit < 0 ? (totalResultSize + resultSize) : maxResultSize * results.size();
- if (totalResultSize >= thresholdBytes) {
- this.file = File.createTempFile(UUID.randomUUID().toString(), null);
- this.af = new RandomAccessFile(file, "rw");
- this.fc = af.getChannel();
- mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
- writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
-
- int resSize = results.size();
- for (int i = 0; i < resSize; i++) {
- int totalLen = 0;
- ResultEntry re = results.pollFirst();
- List<KeyValue> keyValues = toKeyValues(re);
- for (KeyValue kv : keyValues) {
- totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
- }
- writeBuffer.putInt(totalLen);
- for (KeyValue kv : keyValues) {
- writeBuffer.putInt(kv.getLength());
- writeBuffer.put(kv.getBuffer(), kv.getOffset(), kv
- .getLength());
- }
- ImmutableBytesWritable[] sortKeys = re.sortKeys;
- writeBuffer.putInt(sortKeys.length);
- for (ImmutableBytesWritable sortKey : sortKeys) {
- if (sortKey != null) {
- writeBuffer.putInt(sortKey.getLength());
- writeBuffer.put(sortKey.get(), sortKey.getOffset(),
- sortKey.getLength());
- } else {
- writeBuffer.putInt(0);
- }
- }
- // buffer close to exhausted, re-map.
- if (mappingSize - writeBuffer.position() < maxResultSize) {
- writeIndex += writeBuffer.position();
- writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
- }
- }
- writeBuffer.putInt(-1); // end
- flushedCount = results.size();
- results.clear();
- flushBuffer = true;
- }
- }
- return flushBuffer;
- }
-
- public IndexedResultEntry getNextResult() throws IOException {
- if (isClosed)
- return null;
-
- if (!flushBuffer) {
- ResultEntry re = results.poll();
- if (re == null) {
- reachedEnd();
- return null;
- }
- return new IndexedResultEntry(index, re);
- }
-
- if (readBuffer == null) {
- readBuffer = this.fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
- }
-
- int length = readBuffer.getInt();
- if (length < 0) {
- reachedEnd();
- return null;
- }
-
- byte[] rb = new byte[length];
- readBuffer.get(rb);
- Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
- ResultTuple rt = new ResultTuple(result);
- int sortKeySize = readBuffer.getInt();
- ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
- for (int i = 0; i < sortKeySize; i++) {
- int contentLength = readBuffer.getInt();
- if (contentLength > 0) {
- byte[] sortKeyContent = new byte[contentLength];
- readBuffer.get(sortKeyContent);
- sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
- } else {
- sortKeys[i] = null;
- }
- }
- // buffer close to exhausted, re-map.
- if (mappingSize - readBuffer.position() < maxResultSize) {
- readIndex += readBuffer.position();
- readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
- }
-
- return new IndexedResultEntry(index, new ResultEntry(sortKeys, rt));
- }
-
- private void reachedEnd() {
- this.isClosed = true;
- if (this.fc != null) {
- try {
- this.fc.close();
- } catch (IOException ignored) {
- }
- this.fc = null;
- }
- if (this.af != null) {
- try {
- this.af.close();
- } catch (IOException ignored) {
- }
- this.af = null;
- }
- if (this.file != null) {
- file.delete();
- file = null;
- }
- }
-
- public void close() {
- if (!isClosed) {
- this.reachedEnd();
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index ea20114..068547f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -82,7 +82,11 @@ public class HintNode {
*/
NO_CACHE,
/**
- * Avoid using star-join optimization.
+ * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
+ */
+ USE_SORT_MERGE_JOIN,
+ /**
+ * Avoid using star-join optimization. Used for broadcast join (hash join) only.
*/
NO_STAR_JOIN,
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index cc0b455..6d3123f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -590,6 +590,11 @@ public class ParseNodeFactory {
return new DeleteStatement(table, hint, node, orderBy, limit, bindCount);
}
+ public SelectStatement select(SelectStatement statement, ParseNode where) {
+ return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(),
+ statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+ }
+
public SelectStatement select(SelectStatement statement, ParseNode where, ParseNode having) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), having,
statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
@@ -625,6 +630,12 @@ public class ParseNodeFactory {
statement.hasSequence());
}
+ public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy) {
+ return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
+ statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, statement.getLimit(),
+ statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+ }
+
public SelectStatement select(SelectStatement statement, HintNode hint) {
return hint == null || hint.isEmpty() ? statement : select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
[4/5] phoenix git commit: PHOENIX-1799 Support many-to-many joins
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
index ab06d46..3152abe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
@@ -52,7 +52,7 @@ public class SelectStatementRewriter extends ParseNodeRewriter {
SelectStatementRewriter rewriter = new SelectStatementRewriter(removeNodes);
where = where.accept(rewriter);
// Return new SELECT statement with updated WHERE clause
- return NODE_FACTORY.select(statement, where, statement.getHaving());
+ return NODE_FACTORY.select(statement, where);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eddc846d/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
index 47acdac..7931659 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
@@ -145,10 +145,14 @@ public class ValueBitSet {
}
public void or(ImmutableBytesWritable ptr) {
- if (schema == null) {
+ or(ptr, isVarLength() ? Bytes.SIZEOF_SHORT + 1 : Bytes.SIZEOF_SHORT);
+ }
+
+ public void or(ImmutableBytesWritable ptr, int length) {
+ if (schema == null || length == 0) {
return;
}
- if (isVarLength()) {
+ if (length > Bytes.SIZEOF_SHORT) {
int offset = ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_SHORT;
short nLongs = Bytes.toShort(ptr.get(), offset);
offset -= nLongs * Bytes.SIZEOF_LONG;
@@ -160,7 +164,7 @@ public class ValueBitSet {
} else {
long l = Bytes.toShort(ptr.get(), ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_SHORT);
bits[0] |= l;
- maxSetBit = Math.max(maxSetBit, BITS_PER_SHORT - 1);
+ maxSetBit = Math.max(maxSetBit, (bits[0] == 0 ? 0 : BITS_PER_SHORT) - 1);
}
}
@@ -196,3 +200,4 @@ public class ValueBitSet {
maxSetBit = Math.max(maxSetBit, isSet.maxSetBit);
}
}
+
[2/5] phoenix git commit: PHOENIX-1799 Support many-to-many joins
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a715a796/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
new file mode 100644
index 0000000..469388b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -0,0 +1,2822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.JOIN_CUSTOMER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+@Category(HBaseManagedTimeTest.class)
+@RunWith(Parameterized.class)
+public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
+
+ private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ private String[] indexDDL;
+ private String[] plans;
+
+ public SortMergeJoinIT(String[] indexDDL, String[] plans) {
+ this.indexDDL = indexDDL;
+ this.plans = plans;
+ }
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Forces server cache to be used
+ props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void initTable() throws Exception {
+ initTableValues();
+ if (indexDDL != null && indexDDL.length > 0) {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ for (String ddl : indexDDL) {
+ try {
+ conn.createStatement().execute(ddl);
+ } catch (TableAlreadyExistsException e) {
+ }
+ }
+ conn.close();
+ }
+ }
+
+ @Parameters
+ public static Collection<Object> data() {
+ List<Object> testCases = Lists.newArrayList();
+ testCases.add(new String[][] {
+ {}, {
+ "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "AND\n" +
+ " SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER SORTED BY [I.item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " AND (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER FILTER BY QUANTITY < 5000\n" +
+ " SERVER SORTED BY [O.item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT SORTED BY [I.supplier_id]",
+ }});
+ testCases.add(new String[][] {
+ {
+ "CREATE INDEX \"idx_customer\" ON " + JOIN_CUSTOMER_TABLE_FULL_NAME + " (name)",
+ "CREATE INDEX \"idx_item\" ON " + JOIN_ITEM_TABLE_FULL_NAME + " (name) INCLUDE (price, discount1, discount2, \"supplier_id\", description)",
+ "CREATE INDEX \"idx_supplier\" ON " + JOIN_SUPPLIER_TABLE_FULL_NAME + " (name)"
+ }, {
+ "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_supplier\n" +
+ " SERVER SORTED BY [S.:supplier_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ "AND\n" +
+ " SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+ " SERVER SORTED BY [I.:item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " AND (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER FILTER BY QUANTITY < 5000\n" +
+ " SERVER SORTED BY [O.item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT SORTED BY [I.0:supplier_id]"
+ }});
+ testCases.add(new String[][] {
+ {
+ "CREATE LOCAL INDEX \"idx_customer\" ON " + JOIN_CUSTOMER_TABLE_FULL_NAME + " (name)",
+ "CREATE LOCAL INDEX \"idx_item\" ON " + JOIN_ITEM_TABLE_FULL_NAME + " (name) INCLUDE (price, discount1, discount2, \"supplier_id\", description)",
+ "CREATE LOCAL INDEX \"idx_supplier\" ON " + JOIN_SUPPLIER_TABLE_FULL_NAME + " (name)"
+ }, {
+ "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + " [-32768]\n" +
+ " SERVER SORTED BY [S.:supplier_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ "AND\n" +
+ " SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+ " SERVER SORTED BY [I.:item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " AND (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER FILTER BY QUANTITY < 5000\n" +
+ " SERVER SORTED BY [O.item_id]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT SORTED BY [I.0:supplier_id]"
+ }});
+ return testCases;
+ }
+
+
+ protected void initTableValues() throws Exception {
+ ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
+ ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME);
+ ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
+ ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ conn.createStatement().execute("CREATE SEQUENCE my.seq");
+ // Insert into customer table
+ PreparedStatement stmt = conn.prepareStatement(
+ "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
+ " (\"customer_id\", " +
+ " NAME, " +
+ " PHONE, " +
+ " ADDRESS, " +
+ " LOC_ID, " +
+ " DATE) " +
+ "values (?, ?, ?, ?, ?, ?)");
+ stmt.setString(1, "0000000001");
+ stmt.setString(2, "C1");
+ stmt.setString(3, "999-999-1111");
+ stmt.setString(4, "101 XXX Street");
+ stmt.setString(5, "10001");
+ stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "0000000002");
+ stmt.setString(2, "C2");
+ stmt.setString(3, "999-999-2222");
+ stmt.setString(4, "202 XXX Street");
+ stmt.setString(5, null);
+ stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "0000000003");
+ stmt.setString(2, "C3");
+ stmt.setString(3, "999-999-3333");
+ stmt.setString(4, "303 XXX Street");
+ stmt.setString(5, null);
+ stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "0000000004");
+ stmt.setString(2, "C4");
+ stmt.setString(3, "999-999-4444");
+ stmt.setString(4, "404 XXX Street");
+ stmt.setString(5, "10004");
+ stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "0000000005");
+ stmt.setString(2, "C5");
+ stmt.setString(3, "999-999-5555");
+ stmt.setString(4, "505 XXX Street");
+ stmt.setString(5, "10005");
+ stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "0000000006");
+ stmt.setString(2, "C6");
+ stmt.setString(3, "999-999-6666");
+ stmt.setString(4, "606 XXX Street");
+ stmt.setString(5, "10001");
+ stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
+ stmt.execute();
+
+ // Insert into item table
+ stmt = conn.prepareStatement(
+ "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
+ " (\"item_id\", " +
+ " NAME, " +
+ " PRICE, " +
+ " DISCOUNT1, " +
+ " DISCOUNT2, " +
+ " \"supplier_id\", " +
+ " DESCRIPTION) " +
+ "values (?, ?, ?, ?, ?, ?, ?)");
+ stmt.setString(1, "0000000001");
+ stmt.setString(2, "T1");
+ stmt.setInt(3, 100);
+ stmt.setInt(4, 5);
+ stmt.setInt(5, 10);
+ stmt.setString(6, "0000000001");
+ stmt.setString(7, "Item T1");
+ stmt.execute();
+
+ stmt.setString(1, "0000000002");
+ stmt.setString(2, "T2");
+ stmt.setInt(3, 200);
+ stmt.setInt(4, 5);
+ stmt.setInt(5, 8);
+ stmt.setString(6, "0000000001");
+ stmt.setString(7, "Item T2");
+ stmt.execute();
+
+ stmt.setString(1, "0000000003");
+ stmt.setString(2, "T3");
+ stmt.setInt(3, 300);
+ stmt.setInt(4, 8);
+ stmt.setInt(5, 12);
+ stmt.setString(6, "0000000002");
+ stmt.setString(7, "Item T3");
+ stmt.execute();
+
+ stmt.setString(1, "0000000004");
+ stmt.setString(2, "T4");
+ stmt.setInt(3, 400);
+ stmt.setInt(4, 6);
+ stmt.setInt(5, 10);
+ stmt.setString(6, "0000000002");
+ stmt.setString(7, "Item T4");
+ stmt.execute();
+
+ stmt.setString(1, "0000000005");
+ stmt.setString(2, "T5");
+ stmt.setInt(3, 500);
+ stmt.setInt(4, 8);
+ stmt.setInt(5, 15);
+ stmt.setString(6, "0000000005");
+ stmt.setString(7, "Item T5");
+ stmt.execute();
+
+ stmt.setString(1, "0000000006");
+ stmt.setString(2, "T6");
+ stmt.setInt(3, 600);
+ stmt.setInt(4, 8);
+ stmt.setInt(5, 15);
+ stmt.setString(6, "0000000006");
+ stmt.setString(7, "Item T6");
+ stmt.execute();
+
+ stmt.setString(1, "invalid001");
+ stmt.setString(2, "INVALID-1");
+ stmt.setInt(3, 0);
+ stmt.setInt(4, 0);
+ stmt.setInt(5, 0);
+ stmt.setString(6, "0000000000");
+ stmt.setString(7, "Invalid item for join test");
+ stmt.execute();
+
+ // Insert into supplier table
+ stmt = conn.prepareStatement(
+ "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
+ " (\"supplier_id\", " +
+ " NAME, " +
+ " PHONE, " +
+ " ADDRESS, " +
+ " LOC_ID) " +
+ "values (?, ?, ?, ?, ?)");
+ stmt.setString(1, "0000000001");
+ stmt.setString(2, "S1");
+ stmt.setString(3, "888-888-1111");
+ stmt.setString(4, "101 YYY Street");
+ stmt.setString(5, "10001");
+ stmt.execute();
+
+ stmt.setString(1, "0000000002");
+ stmt.setString(2, "S2");
+ stmt.setString(3, "888-888-2222");
+ stmt.setString(4, "202 YYY Street");
+ stmt.setString(5, "10002");
+ stmt.execute();
+
+ stmt.setString(1, "0000000003");
+ stmt.setString(2, "S3");
+ stmt.setString(3, "888-888-3333");
+ stmt.setString(4, "303 YYY Street");
+ stmt.setString(5, null);
+ stmt.execute();
+
+ stmt.setString(1, "0000000004");
+ stmt.setString(2, "S4");
+ stmt.setString(3, "888-888-4444");
+ stmt.setString(4, "404 YYY Street");
+ stmt.setString(5, null);
+ stmt.execute();
+
+ stmt.setString(1, "0000000005");
+ stmt.setString(2, "S5");
+ stmt.setString(3, "888-888-5555");
+ stmt.setString(4, "505 YYY Street");
+ stmt.setString(5, "10005");
+ stmt.execute();
+
+ stmt.setString(1, "0000000006");
+ stmt.setString(2, "S6");
+ stmt.setString(3, "888-888-6666");
+ stmt.setString(4, "606 YYY Street");
+ stmt.setString(5, "10006");
+ stmt.execute();
+
+ // Insert into order table
+ stmt = conn.prepareStatement(
+ "upsert into " + JOIN_ORDER_TABLE_FULL_NAME +
+ " (\"order_id\", " +
+ " \"customer_id\", " +
+ " \"item_id\", " +
+ " PRICE, " +
+ " QUANTITY," +
+ " DATE) " +
+ "values (?, ?, ?, ?, ?, ?)");
+ stmt.setString(1, "000000000000001");
+ stmt.setString(2, "0000000004");
+ stmt.setString(3, "0000000001");
+ stmt.setInt(4, 100);
+ stmt.setInt(5, 1000);
+ stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "000000000000002");
+ stmt.setString(2, "0000000003");
+ stmt.setString(3, "0000000006");
+ stmt.setInt(4, 552);
+ stmt.setInt(5, 2000);
+ stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "000000000000003");
+ stmt.setString(2, "0000000002");
+ stmt.setString(3, "0000000002");
+ stmt.setInt(4, 190);
+ stmt.setInt(5, 3000);
+ stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "000000000000004");
+ stmt.setString(2, "0000000004");
+ stmt.setString(3, "0000000006");
+ stmt.setInt(4, 510);
+ stmt.setInt(5, 4000);
+ stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
+ stmt.execute();
+
+ stmt.setString(1, "000000000000005");
+ stmt.setString(2, "0000000005");
+ stmt.setString(3, "0000000003");
+ stmt.setInt(4, 264);
+ stmt.setInt(5, 5000);
+ stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime()));
+ stmt.execute();
+
+ conn.commit();
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testDefaultJoin() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "0000000006");
+ assertEquals(rs.getString(4), "S6");
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertEquals(1, rs.getInt(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertEquals(2, rs.getInt(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertEquals(3, rs.getInt(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertEquals(4, rs.getInt(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+ assertEquals(5, rs.getInt(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "0000000006");
+ assertEquals(rs.getString(4), "S6");
+ assertEquals(6, rs.getInt(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testLeftJoin() throws Exception {
+ String query[] = new String[3];
+ query[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ query[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ " + JOIN_ITEM_TABLE_FULL_NAME + ".\"item_id\", " + JOIN_ITEM_TABLE_FULL_NAME + ".name, " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".\"supplier_id\", " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".\"supplier_id\" ORDER BY \"item_id\"";
+ query[2] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", " + JOIN_ITEM_TABLE_FULL_NAME + ".name, supp.\"supplier_id\", " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ for (int i = 0; i < query.length; i++) {
+ PreparedStatement statement = conn.prepareStatement(query[i]);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "0000000006");
+ assertEquals(rs.getString(4), "S6");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "invalid001");
+ assertEquals(rs.getString(2), "INVALID-1");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+
+ assertFalse(rs.next());
+ rs.close();
+ statement.close();
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testRightJoin() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "0000000006");
+ assertEquals(rs.getString(4), "S6");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "invalid001");
+ assertEquals(rs.getString(2), "INVALID-1");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testInnerJoinWithPreFilters() throws Exception {
+ String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005' ORDER BY \"item_id\"";
+ String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+
+ assertFalse(rs.next());
+
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testLeftJoinWithPreFilters() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "T6");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "invalid001");
+ assertEquals(rs.getString(2), "INVALID-1");
+ assertNull(rs.getString(3));
+ assertNull(rs.getString(4));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinWithPostFilters() throws Exception {
+ String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " item ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005' ORDER BY \"item_id\"";
+ String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005' ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+
+ assertFalse(rs.next());
+
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "0000000001");
+ assertEquals(rs.getString(4), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testStarJoin() throws Exception {
+ String[] query = new String[5];
+ query[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN "
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\"";
+ query[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o, "
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c, "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+ query[2] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN "
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\"";
+ query[3] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM (" + JOIN_ORDER_TABLE_FULL_NAME + " o, "
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c), "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+ query[4] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o, ("
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c, "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i) WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ for (int i = 0; i < query.length; i++) {
+ PreparedStatement statement = conn.prepareStatement(query[i]);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString("\"order_id\""), "000000000000001");
+ assertEquals(rs.getString(2), "C4");
+ assertEquals(rs.getString("C.name"), "C4");
+ assertEquals(rs.getString(3), "T1");
+ assertEquals(rs.getString("iName"), "T1");
+ assertEquals(rs.getInt(4), 1000);
+ assertEquals(rs.getInt("Quantity"), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "C3");
+ assertEquals(rs.getString(3), "T6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "C2");
+ assertEquals(rs.getString(3), "T2");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "C4");
+ assertEquals(rs.getString(3), "T6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "C5");
+ assertEquals(rs.getString(3), "T3");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testLeftJoinWithAggregation() throws Exception {
+ String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.name ORDER BY i.name";
+ String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC";
+ String query3 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC NULLS LAST, iid";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T1");
+ assertEquals(rs.getInt(2), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T2");
+ assertEquals(rs.getInt(2), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T3");
+ assertEquals(rs.getInt(2), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T6");
+ assertEquals(rs.getInt(2), 6000);
+
+ assertFalse(rs.next());
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000006");
+ assertEquals(rs.getInt("q"), 6000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000003");
+ assertEquals(rs.getInt("q"), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000002");
+ assertEquals(rs.getInt("q"), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000001");
+ assertEquals(rs.getInt("q"), 1000);
+
+ assertFalse(rs.next());
+
+ statement = conn.prepareStatement(query3);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000006");
+ assertEquals(rs.getInt("q"), 6000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000003");
+ assertEquals(rs.getInt("q"), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000002");
+ assertEquals(rs.getInt("q"), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000001");
+ assertEquals(rs.getInt("q"), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000004");
+ assertEquals(rs.getInt("q"), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000005");
+ assertEquals(rs.getInt("q"), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "invalid001");
+ assertEquals(rs.getInt("q"), 0);
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testRightJoinWithAggregation() throws Exception {
+ String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.name ORDER BY i.name";
+ String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC NULLS LAST, iid";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "INVALID-1");
+ assertEquals(rs.getInt(2), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T1");
+ assertEquals(rs.getInt(2), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T2");
+ assertEquals(rs.getInt(2), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T3");
+ assertEquals(rs.getInt(2), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T4");
+ assertEquals(rs.getInt(2), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T5");
+ assertEquals(rs.getInt(2), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "T6");
+ assertEquals(rs.getInt(2), 6000);
+
+ assertFalse(rs.next());
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000006");
+ assertEquals(rs.getInt("q"), 6000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000003");
+ assertEquals(rs.getInt("q"), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000002");
+ assertEquals(rs.getInt("q"), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000001");
+ assertEquals(rs.getInt("q"), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000004");
+ assertEquals(rs.getInt("q"), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "0000000005");
+ assertEquals(rs.getInt("q"), 0);
+ assertTrue (rs.next());
+ assertEquals(rs.getString("iid"), "invalid001");
+ assertEquals(rs.getInt("q"), 0);
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testLeftRightJoin() throws Exception {
+ String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+ String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i RIGHT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\")"
+ + " ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S5");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S4");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S3");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testRightLeftJoin() throws Exception {
+ String query1 = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i RIGHT JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\"";
+ String query2 = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\")"
+ + " ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "INVALID-1");
+ assertNull(rs.getString(3));
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "S5");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testMultiLeftJoin() throws Exception {
+ String[] queries = {
+ "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\"",
+ "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN "
+ + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\") "
+ + "ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\""};
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ for (String query : queries) {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testMultiRightJoin() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "S5");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S4");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S3");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ // Basically a copy of testMultiRightJoin, but with a very small result scan chunk size
+ // to test that repeated row keys within a single chunk are handled properly
+ @Test
+ public void testMultiRightJoin_SmallChunkSize() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "S5");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S4");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertNull(rs.getString(2));
+ assertEquals(rs.getString(3), "S3");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 0);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinWithWildcard() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ * FROM " + JOIN_ITEM_TABLE_FULL_NAME + " LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000001");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T1");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 100);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 5);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 10);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000001");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T1");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000001");
+ assertEquals(rs.getString("supp.name"), "S1");
+ assertEquals(rs.getString("supp.phone"), "888-888-1111");
+ assertEquals(rs.getString("supp.address"), "101 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10001");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000002");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T2");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 200);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 5);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 8);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000001");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T2");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000001");
+ assertEquals(rs.getString("supp.name"), "S1");
+ assertEquals(rs.getString("supp.phone"), "888-888-1111");
+ assertEquals(rs.getString("supp.address"), "101 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10001");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000003");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T3");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 300);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 12);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000002");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T3");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000002");
+ assertEquals(rs.getString("supp.name"), "S2");
+ assertEquals(rs.getString("supp.phone"), "888-888-2222");
+ assertEquals(rs.getString("supp.address"), "202 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10002");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000004");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T4");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 400);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 6);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 10);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000002");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T4");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000002");
+ assertEquals(rs.getString("supp.name"), "S2");
+ assertEquals(rs.getString("supp.phone"), "888-888-2222");
+ assertEquals(rs.getString("supp.address"), "202 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10002");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000005");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T5");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 500);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 15);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000005");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T5");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000005");
+ assertEquals(rs.getString("supp.name"), "S5");
+ assertEquals(rs.getString("supp.phone"), "888-888-5555");
+ assertEquals(rs.getString("supp.address"), "505 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10005");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000006");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T6");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 600);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 15);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000006");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T6");
+ assertEquals(rs.getString("SUPP.supplier_id"), "0000000006");
+ assertEquals(rs.getString("supp.name"), "S6");
+ assertEquals(rs.getString("supp.phone"), "888-888-6666");
+ assertEquals(rs.getString("supp.address"), "606 YYY Street");
+ assertEquals(rs.getString("supp.loc_id"), "10006");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "invalid001");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "INVALID-1");
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 0);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 0);
+ assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 0);
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000000");
+ assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Invalid item for join test");
+ assertNull(rs.getString("SUPP.supplier_id"));
+ assertNull(rs.getString("supp.name"));
+ assertNull(rs.getString("supp.phone"));
+ assertNull(rs.getString("supp.address"));
+ assertNull(rs.getString("supp.loc_id"));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinWithTableWildcard() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ s.*, "+ JOIN_ITEM_TABLE_FULL_NAME + ".*, \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ ResultSetMetaData md = rs.getMetaData();
+ assertEquals(md.getColumnCount(), 13);
+
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "S5");
+ assertEquals(rs.getString(3), "888-888-5555");
+ assertEquals(rs.getString(4), "505 YYY Street");
+ assertEquals(rs.getString(5), "10005");
+ assertEquals(rs.getString(6), "0000000005");
+ assertEquals(rs.getString(7), "T5");
+ assertEquals(rs.getInt(8), 500);
+ assertEquals(rs.getInt(9), 8);
+ assertEquals(rs.getInt(10), 15);
+ assertEquals(rs.getString(11), "0000000005");
+ assertEquals(rs.getString(12), "Item T5");
+ assertNull(rs.getString(13));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "S4");
+ assertEquals(rs.getString(3), "888-888-4444");
+ assertEquals(rs.getString(4), "404 YYY Street");
+ assertNull(rs.getString(5));
+ assertNull(rs.getString(6));
+ assertNull(rs.getString(7));
+ assertEquals(rs.getInt(8), 0);
+ assertEquals(rs.getInt(9), 0);
+ assertEquals(rs.getInt(10), 0);
+ assertNull(rs.getString(11));
+ assertNull(rs.getString(12));
+ assertNull(rs.getString(13));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "S3");
+ assertEquals(rs.getString(3), "888-888-3333");
+ assertEquals(rs.getString(4), "303 YYY Street");
+ assertNull(rs.getString(5));
+ assertNull(rs.getString(6));
+ assertNull(rs.getString(7));
+ assertEquals(rs.getInt(8), 0);
+ assertEquals(rs.getInt(9), 0);
+ assertEquals(rs.getInt(10), 0);
+ assertNull(rs.getString(11));
+ assertNull(rs.getString(12));
+ assertNull(rs.getString(13));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "S2");
+ assertEquals(rs.getString(3), "888-888-2222");
+ assertEquals(rs.getString(4), "202 YYY Street");
+ assertEquals(rs.getString(5), "10002");
+ assertEquals(rs.getString(6), "0000000004");
+ assertEquals(rs.getString(7), "T4");
+ assertEquals(rs.getInt(8), 400);
+ assertEquals(rs.getInt(9), 6);
+ assertEquals(rs.getInt(10), 10);
+ assertEquals(rs.getString(11), "0000000002");
+ assertEquals(rs.getString(12), "Item T4");
+ assertNull(rs.getString(13));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "S1");
+ assertEquals(rs.getString(3), "888-888-1111");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getString(5), "10001");
+ assertEquals(rs.getString(6), "0000000001");
+ assertEquals(rs.getString(7), "T1");
+ assertEquals(rs.getInt(8), 100);
+ assertEquals(rs.getInt(9), 5);
+ assertEquals(rs.getInt(10), 10);
+ assertEquals(rs.getString(11), "0000000001");
+ assertEquals(rs.getString(12), "Item T1");
+ assertEquals(rs.getString(13), "000000000000001");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "S6");
+ assertEquals(rs.getString(3), "888-888-6666");
+ assertEquals(rs.getString(4), "606 YYY Street");
+ assertEquals(rs.getString(5), "10006");
+ assertEquals(rs.getString(6), "0000000006");
+ assertEquals(rs.getString(7), "T6");
+ assertEquals(rs.getInt(8), 600);
+ assertEquals(rs.getInt(9), 8);
+ assertEquals(rs.getInt(10), 15);
+ assertEquals(rs.getString(11), "0000000006");
+ assertEquals(rs.getString(12), "Item T6");
+ assertEquals(rs.getString(13), "000000000000002");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000001");
+ assertEquals(rs.getString(2), "S1");
+ assertEquals(rs.getString(3), "888-888-1111");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getString(5), "10001");
+ assertEquals(rs.getString(6), "0000000002");
+ assertEquals(rs.getString(7), "T2");
+ assertEquals(rs.getInt(8), 200);
+ assertEquals(rs.getInt(9), 5);
+ assertEquals(rs.getInt(10), 8);
+ assertEquals(rs.getString(11), "0000000001");
+ assertEquals(rs.getString(12), "Item T2");
+ assertEquals(rs.getString(13), "000000000000003");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000006");
+ assertEquals(rs.getString(2), "S6");
+ assertEquals(rs.getString(3), "888-888-6666");
+ assertEquals(rs.getString(4), "606 YYY Street");
+ assertEquals(rs.getString(5), "10006");
+ assertEquals(rs.getString(6), "0000000006");
+ assertEquals(rs.getString(7), "T6");
+ assertEquals(rs.getInt(8), 600);
+ assertEquals(rs.getInt(9), 8);
+ assertEquals(rs.getInt(10), 15);
+ assertEquals(rs.getString(11), "0000000006");
+ assertEquals(rs.getString(12), "Item T6");
+ assertEquals(rs.getString(13), "000000000000004");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000002");
+ assertEquals(rs.getString(2), "S2");
+ assertEquals(rs.getString(3), "888-888-2222");
+ assertEquals(rs.getString(4), "202 YYY Street");
+ assertEquals(rs.getString(5), "10002");
+ assertEquals(rs.getString(6), "0000000003");
+ assertEquals(rs.getString(7), "T3");
+ assertEquals(rs.getInt(8), 300);
+ assertEquals(rs.getInt(9), 8);
+ assertEquals(rs.getInt(10), 12);
+ assertEquals(rs.getString(11), "0000000002");
+ assertEquals(rs.getString(12), "Item T3");
+ assertEquals(rs.getString(13), "000000000000005");
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinMultiJoinKeys() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ c.name, s.name FROM " + JOIN_CUSTOMER_TABLE_FULL_NAME + " c LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON \"customer_id\" = \"supplier_id\" AND c.loc_id = s.loc_id AND substr(s.name, 2, 1) = substr(c.name, 2, 1) ORDER BY \"customer_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C1");
+ assertEquals(rs.getString(2), "S1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C2");
+ assertNull(rs.getString(2));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C3");
+ assertEquals(rs.getString(2), "S3");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C4");
+ assertNull(rs.getString(2));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C5");
+ assertEquals(rs.getString(2), "S5");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "C6");
+ assertNull(rs.getString(2));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinWithDifferentNumericJoinKeyTypes() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, i.price, discount2, quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o INNER JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" AND o.price = (i.price * (100 - discount2)) / 100.0 WHERE quantity < 5000";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getInt(3), 600);
+ assertEquals(rs.getInt(4), 15);
+ assertEquals(rs.getInt(5), 4000);
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinWithDifferentDateJoinKeyTypes() throws Exception {
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o INNER JOIN "
+ + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" AND o.date = c.date ORDER BY \"order_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "C4");
+ assertEquals(rs.getTimestamp(3), new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "C3");
+ assertEquals(rs.getTimestamp(3), new Ti
<TRUNCATED>