You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:42:24 UTC
[35/50] [abbrv] phoenix git commit: PHOENIX-1598 Column encoding to
save space and improve performance
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
new file mode 100644
index 0000000..5a5b355
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+
+/**
+ * List implementation that provides indexed based look up when the cell column qualifiers are positive numbers.
+ * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link ImmutableStorageScheme}.
+ * The api methods in this list assume that the caller wants to see
+ * and add only non null elements in the list.
+ * <p>
+ * Please note that this implementation doesn't implement all the optional methods of the
+ * {@link List} interface. Such unsupported methods could violate the basic invariance of the list that every cell with
+ * an encoded column qualifier has a fixed position in the list.
+ * </p>
+ * <p>
+ * An important performance characteristic of this list is that doing look up on the basis of index via {@link #get(int)}
+ * is an O(n) operation. This makes iterating through the list using {@link #get(int)} an O(n^2) operation.
+ * Instead, for iterating through the list, one should use the iterators created through {@link #iterator()} or
+ * {@link #listIterator()}. Do note that getting an element using {@link #getCellForColumnQualifier(int)} is an O(1) operation
+ * and should generally be the way for accessing elements in the list.
+ * </p>
+ */
+@NotThreadSafe
+public class EncodedColumnQualiferCellsList implements List<Cell> {
+
+ private int minQualifier;
+ private int maxQualifier;
+ private int nonReservedRangeOffset;
+ private final Cell[] array;
+ private int numNonNullElements;
+ private int firstNonNullElementIdx = -1;
+ private static final int RESERVED_RANGE_SIZE = ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
+ // Used by iterators to figure out if the list was structurally modified.
+ private int modCount = 0;
+ private final QualifierEncodingScheme encodingScheme;
+
+ public EncodedColumnQualiferCellsList(int minQ, int maxQ, QualifierEncodingScheme encodingScheme) {
+ checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ
+ + ". Max: " + maxQ);
+ this.minQualifier = minQ;
+ this.maxQualifier = maxQ;
+ int size = 0;
+ if (maxQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+ size = RESERVED_RANGE_SIZE;
+ } else if (minQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+ size = (maxQ - minQ + 1);
+ } else {
+ size = RESERVED_RANGE_SIZE + (maxQ - minQ + 1);
+ }
+ this.array = new Cell[size];
+ this.nonReservedRangeOffset = minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE ? minQ - ENCODED_CQ_COUNTER_INITIAL_VALUE : 0;
+ this.encodingScheme = encodingScheme;
+ }
+
+ @Override
+ public int size() {
+ return numNonNullElements;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return numNonNullElements == 0;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return indexOf(o) >= 0;
+ }
+
+ @Override
+ public Object[] toArray() {
+ Object[] toReturn = new Object[numNonNullElements];
+ int counter = 0;
+ if (numNonNullElements > 0) {
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != null) {
+ toReturn[counter++] = array[i];
+ }
+ }
+ }
+ return toReturn;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ T[] toReturn =
+ (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(),
+ numNonNullElements);
+ int counter = 0;
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != null) {
+ toReturn[counter++] = (T) array[i];
+ }
+ }
+ return toReturn;
+ }
+
+ @Override
+ public boolean add(Cell e) {
+ if (e == null) {
+ throw new NullPointerException();
+ }
+ int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());
+
+ checkQualifierRange(columnQualifier);
+ int idx = getArrayIndex(columnQualifier);
+ if (array[idx] == null) {
+ numNonNullElements++;
+ }
+ array[idx] = e;
+ if (firstNonNullElementIdx == -1) {
+ firstNonNullElementIdx = idx;
+ } else if (idx < firstNonNullElementIdx) {
+ firstNonNullElementIdx = idx;
+ }
+ modCount++;
+ /*
+ * Note that we don't care about equality of the element being added with the element
+ * already present at the index.
+ */
+ return true;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ if (o == null) {
+ return false;
+ }
+ Cell e = (Cell) o;
+ int i = 0;
+ while (i < array.length) {
+ if (array[i] != null && array[i].equals(e)) {
+ array[i] = null;
+ numNonNullElements--;
+ if (numNonNullElements == 0) {
+ firstNonNullElementIdx = -1;
+ } else if (firstNonNullElementIdx == i) {
+ // the element being removed was the first non-null element we knew
+ while (i < array.length && (array[i]) == null) {
+ i++;
+ }
+ if (i < array.length) {
+ firstNonNullElementIdx = i;
+ } else {
+ firstNonNullElementIdx = -1;
+ }
+ }
+ modCount++;
+ return true;
+ }
+ i++;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ boolean containsAll = true;
+ Iterator<?> itr = c.iterator();
+ while (itr.hasNext()) {
+ containsAll &= (indexOf(itr.next()) >= 0);
+ }
+ return containsAll;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Cell> c) {
+ boolean changed = false;
+ for (Cell cell : c) {
+ if (c == null) {
+ throw new NullPointerException();
+ }
+ changed |= add(cell);
+ }
+ return changed;
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends Cell> c) {
+ throwGenericUnsupportedOperationException();
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ Iterator<?> itr = c.iterator();
+ boolean changed = false;
+ while (itr.hasNext()) {
+ changed |= remove(itr.next());
+ }
+ return changed;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> collection) {
+ boolean changed = false;
+ // Optimize if the passed collection is an instance of EncodedColumnQualiferCellsList
+ if (collection instanceof EncodedColumnQualiferCellsList) {
+ EncodedColumnQualiferCellsList list = (EncodedColumnQualiferCellsList) collection;
+ ListIterator<Cell> listItr = this.listIterator();
+ while (listItr.hasNext()) {
+ Cell cellInThis = listItr.next();
+ int qualifier = encodingScheme.decode(cellInThis.getQualifierArray(),
+ cellInThis.getQualifierOffset(), cellInThis.getQualifierLength());
+ try {
+ Cell cellInParam = list.getCellForColumnQualifier(qualifier);
+ if (cellInParam != null && cellInParam.equals(cellInThis)) {
+ continue;
+ }
+ listItr.remove();
+ changed = true;
+ } catch (IndexOutOfBoundsException expected) {
+ // this could happen when the qualifier of cellInParam lies out of
+ // the range of this list.
+ listItr.remove();
+ changed = true;
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Operation only supported for collections of type EncodedColumnQualiferCellsList");
+ }
+ return changed;
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = null;
+ }
+ firstNonNullElementIdx = -1;
+ numNonNullElements = 0;
+ modCount++;
+ }
+
+ @Override
+ public Cell get(int index) {
+ rangeCheck(index);
+ int numNonNullElementsFound = 0;
+ for (int i = firstNonNullElementIdx; i < array.length; i++) {
+ if (array[i] != null) {
+ numNonNullElementsFound++;
+ if (numNonNullElementsFound == index + 1) {
+ return array[i];
+ }
+ }
+ }
+ throw new IllegalStateException("There was no element present in the list at index "
+ + index + " even though number of elements in the list are " + size());
+ }
+
+ @Override
+ public Cell set(int index, Cell e) {
+ throwGenericUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public void add(int index, Cell element) {
+ throwGenericUnsupportedOperationException();
+ }
+
+ @Override
+ public Cell remove(int index) {
+ throwGenericUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ if (o == null || isEmpty()) {
+ return -1;
+ } else {
+ int numNonNull = -1;
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != null) {
+ numNonNull++;
+ }
+ if (o.equals(array[i])) {
+ return numNonNull;
+ }
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ if (o == null || isEmpty()) {
+ return -1;
+ }
+ int lastIndex = numNonNullElements;
+ for (int i = array.length - 1; i >= 0; i--) {
+ if (array[i] != null) {
+ lastIndex--;
+ }
+ if (o.equals(array[i])) {
+ return lastIndex;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public ListIterator<Cell> listIterator() {
+ return new ListItr();
+ }
+
+ @Override
+ public ListIterator<Cell> listIterator(int index) {
+ throwGenericUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public List<Cell> subList(int fromIndex, int toIndex) {
+ throwGenericUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public Iterator<Cell> iterator() {
+ return new Itr();
+ }
+
+ public Cell getCellForColumnQualifier(byte[] qualifierBytes) {
+ int columnQualifier = encodingScheme.decode(qualifierBytes);
+ return getCellForColumnQualifier(columnQualifier);
+ }
+
+ public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, int length) {
+ int columnQualifier = encodingScheme.decode(qualifierBytes, offset, length);
+ return getCellForColumnQualifier(columnQualifier);
+ }
+
+ private Cell getCellForColumnQualifier(int columnQualifier) {
+ checkQualifierRange(columnQualifier);
+ int idx = getArrayIndex(columnQualifier);
+ Cell c = array[idx];
+ return c;
+ }
+
+ public Cell getFirstCell() {
+ if (firstNonNullElementIdx == -1) {
+ throw new NoSuchElementException("No elements present in the list");
+ }
+ return array[firstNonNullElementIdx];
+ }
+
+ private void checkQualifierRange(int qualifier) {
+ if (qualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+ return; // space in the array for reserved range is always allocated.
+ }
+ if (qualifier < minQualifier || qualifier > maxQualifier) {
+ throw new IndexOutOfBoundsException("Qualifier " + qualifier
+ + " is out of the valid range - (" + minQualifier + ", " + maxQualifier + ")");
+ }
+ }
+
+ private void rangeCheck(int index) {
+ if (index < 0 || index >= size()) {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ private int getArrayIndex(int columnQualifier) {
+ checkArgument(columnQualifier >= ENCODED_EMPTY_COLUMN_NAME);
+ if (columnQualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
+ return columnQualifier;
+ }
+ return columnQualifier - nonReservedRangeOffset;
+ }
+
+ private void throwGenericUnsupportedOperationException() {
+ throw new UnsupportedOperationException(
+ "Operation cannot be supported because it potentially violates the invariance contract of this list implementation");
+ }
+
+ private class Itr implements Iterator<Cell> {
+ protected int nextIndex = 0;
+ protected int lastRet = -1;
+ protected int expectedModCount = modCount;
+
+ private Itr() {
+ moveForward(true);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextIndex != -1;
+ }
+
+ @Override
+ public Cell next() {
+ checkForCoModification();
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Cell next = array[nextIndex];
+ lastRet = nextIndex;
+ moveForward(false);
+ modCount++;
+ expectedModCount = modCount;
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ if (lastRet < 0) {
+ throw new IllegalStateException();
+ }
+ checkForCoModification();
+ array[lastRet] = null;
+ lastRet = -1;
+ numNonNullElements--;
+ modCount++;
+ expectedModCount = modCount;
+ }
+
+ protected void moveForward(boolean init) {
+ int i = init ? 0 : nextIndex + 1;
+ while (i < array.length && (array[i]) == null) {
+ i++;
+ }
+ if (i < array.length) {
+ nextIndex = i;
+ } else {
+ nextIndex = -1;
+ }
+ }
+
+ protected void checkForCoModification() {
+ if (modCount != expectedModCount) {
+ throw new ConcurrentModificationException();
+ }
+ }
+
+ }
+
+ private class ListItr extends Itr implements ListIterator<Cell> {
+ private int previousIndex = -1;
+
+ private ListItr() {
+ moveForward(true);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextIndex != -1;
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ return previousIndex != -1;
+ }
+
+ @Override
+ public Cell previous() {
+ if (previousIndex == -1) {
+ throw new NoSuchElementException();
+ }
+ checkForCoModification();
+ lastRet = previousIndex;
+ movePointersBackward();
+ return array[lastRet];
+ }
+
+ @Override
+ public int nextIndex() {
+ return nextIndex;
+ }
+
+ @Override
+ public int previousIndex() {
+ return previousIndex;
+ }
+
+ @Override
+ public void remove() {
+ if (lastRet == nextIndex) {
+ moveNextPointer(nextIndex);
+ }
+ super.remove();
+ expectedModCount = modCount;
+ }
+
+ @Override
+ public void set(Cell e) {
+ if (lastRet == -1) {
+ throw new IllegalStateException();
+ }
+ int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());
+ int idx = getArrayIndex(columnQualifier);
+ if (idx != lastRet) {
+ throw new IllegalArgumentException("Cell " + e + " with column qualifier "
+ + columnQualifier + " belongs at index " + idx
+ + ". It cannot be added at the position " + lastRet
+ + " to which the previous next() or previous() was pointing to.");
+ }
+ EncodedColumnQualiferCellsList.this.add(e);
+ expectedModCount = modCount;
+ }
+
+ @Override
+ public void add(Cell e) {
+ throwGenericUnsupportedOperationException();
+ }
+
+ @Override
+ protected void moveForward(boolean init) {
+ if (!init) {
+ previousIndex = nextIndex;
+ }
+ int i = init ? 0 : nextIndex + 1;
+ moveNextPointer(i);
+ }
+
+ private void moveNextPointer(int i) {
+ while (i < array.length && (array[i]) == null) {
+ i++;
+ }
+ if (i < array.length) {
+ nextIndex = i;
+ } else {
+ nextIndex = -1;
+ }
+ }
+
+ private void movePointersBackward() {
+ nextIndex = previousIndex;
+ int i = previousIndex - 1;
+ movePreviousPointer(i);
+ }
+
+ private void movePreviousPointer(int i) {
+ for (; i >= 0; i--) {
+ if (array[i] != null) {
+ previousIndex = i;
+ break;
+ }
+ }
+ if (i < 0) {
+ previousIndex = -1;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
index 53f155b..d946870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
@@ -36,6 +36,7 @@ public class MultiKeyValueTuple extends BaseTuple {
}
/** Caller must not modify the list that is passed here */
+ @Override
public void setKeyValues(List<Cell> values) {
this.values = values;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
new file mode 100644
index 0000000..01a5e4d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Tuple that uses the
+ */
+public class PositionBasedMultiKeyValueTuple extends BaseTuple {
+ private EncodedColumnQualiferCellsList values;
+
+ public PositionBasedMultiKeyValueTuple() {}
+
+ public PositionBasedMultiKeyValueTuple(List<Cell> values) {
+ checkArgument(values instanceof EncodedColumnQualiferCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type EncodedColumnQualiferCellsList");
+ this.values = (EncodedColumnQualiferCellsList)values;
+ }
+
+ /** Caller must not modify the list that is passed here */
+ @Override
+ public void setKeyValues(List<Cell> values) {
+ checkArgument(values instanceof EncodedColumnQualiferCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type EncodedColumnQualiferCellsList");
+ this.values = (EncodedColumnQualiferCellsList)values;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ Cell value = values.getFirstCell();
+ ptr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public Cell getValue(byte[] family, byte[] qualifier) {
+ return values.getCellForColumnQualifier(qualifier);
+ }
+
+ @Override
+ public String toString() {
+ return values.toString();
+ }
+
+ @Override
+ public int size() {
+ return values.size();
+ }
+
+ @Override
+ public Cell getValue(int index) {
+ return values.get(index);
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ Cell kv = getValue(family, qualifier);
+ if (kv == null)
+ return false;
+ ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ return true;
+ }}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
new file mode 100644
index 0000000..63ba101
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+
+public class PositionBasedResultTuple extends BaseTuple {
+ private final EncodedColumnQualiferCellsList cells;
+
+ public PositionBasedResultTuple(List<Cell> list) {
+ checkArgument(list instanceof EncodedColumnQualiferCellsList, "Invalid list type");
+ this.cells = (EncodedColumnQualiferCellsList)list;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ Cell value = cells.getFirstCell();
+ ptr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public KeyValue getValue(byte[] family, byte[] qualifier) {
+ return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(qualifier));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("keyvalues=");
+ if(this.cells == null || this.cells.isEmpty()) {
+ sb.append("NONE");
+ return sb.toString();
+ }
+ sb.append("{");
+ boolean moreThanOne = false;
+ for(Cell kv : this.cells) {
+ if(moreThanOne) {
+ sb.append(", \n");
+ } else {
+ moreThanOne = true;
+ }
+ sb.append(kv.toString()+"/value="+Bytes.toString(kv.getValueArray(),
+ kv.getValueOffset(), kv.getValueLength()));
+ }
+ sb.append("}\n");
+ return sb.toString();
+ }
+
+ @Override
+ public int size() {
+ return cells.size();
+ }
+
+ @Override
+ public KeyValue getValue(int index) {
+ return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(index == 0 ? cells.getFirstCell() : cells.get(index));
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ KeyValue kv = getValue(family, qualifier);
+ if (kv == null)
+ return false;
+ ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ return true;
+ }
+
+ public Iterator<Cell> getTupleIterator() {
+ return new TupleIterator(cells.iterator());
+ }
+
+ private static class TupleIterator implements Iterator<Cell> {
+
+ private final Iterator<Cell> delegate;
+ private TupleIterator(Iterator<Cell> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public Cell next() {
+ return delegate.next();
+ }
+
+ @Override
+ public void remove() {
+ delegate.remove();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index c28a2bf..3774837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema.tuple;
+import java.util.Collections;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
@@ -25,25 +27,23 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.util.KeyValueUtil;
-
+/**
+ *
+ * Wrapper around {@link Result} that implements Phoenix's {@link Tuple} interface.
+ *
+ */
public class ResultTuple extends BaseTuple {
- private Result result;
+ private final Result result;
+ public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList()));
public ResultTuple(Result result) {
this.result = result;
}
- public ResultTuple() {
- }
-
public Result getResult() {
return this.result;
}
- public void setResult(Result result) {
- this.result = result;
- }
-
@Override
public void getKey(ImmutableBytesWritable ptr) {
ptr.set(result.getRow());
@@ -104,4 +104,4 @@ public class ResultTuple extends BaseTuple {
ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
return true;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
index 61b2a4f..e4a887b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema.tuple;
+import java.util.List;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -87,4 +89,6 @@ public interface Tuple {
* @return the current or next sequence value
*/
public long getSequenceValue(int index);
+
+ public void setKeyValues(List<Cell> values);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index 1d2cfb2..f31f272 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -22,19 +22,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Types;
import java.text.Format;
-import java.util.LinkedList;
-import java.util.List;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.ValueSchema;
-import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -74,8 +70,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
this, actualModifer, desiredModifier, true);
}
- public static final byte ARRAY_SERIALIZATION_VERSION = 1;
-
+ // array serialization format where bytes can be used as part of the row key
+ public static final byte SORTABLE_SERIALIZATION_VERSION = 1;
+ // array serialization format where bytes are immutable (does not support prepend/append or sorting)
+ public static final byte IMMUTABLE_SERIALIZATION_VERSION = 2;
+
protected PArrayDataType(String sqlTypeName, int sqlType, Class clazz, PDataCodec codec, int ordinal) {
super(sqlTypeName, sqlType, clazz, codec, ordinal);
}
@@ -186,9 +185,17 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
oStream.write(sepByte);
}
- public static boolean useShortForOffsetArray(int maxOffset) {
- // If the max offset is less than Short.MAX_VALUE then offset array can use short
- if (maxOffset <= (2 * Short.MAX_VALUE)) { return true; }
+ // this method is only for append/prepend/concat operations which are only supported for the SORTABLE_SERIALIZATION_VERSION
+ public static boolean useShortForOffsetArray(int maxoffset) {
+ return useShortForOffsetArray(maxoffset, SORTABLE_SERIALIZATION_VERSION);
+ }
+
+ public static boolean useShortForOffsetArray(int maxoffset, byte serializationVersion) {
+ if (serializationVersion == IMMUTABLE_SERIALIZATION_VERSION) {
+ return (maxoffset <= Short.MAX_VALUE && maxoffset >= Short.MIN_VALUE );
+ }
+ // If the max offset is less than Short.MAX_VALUE then offset array can use short
+ else if (maxoffset <= (2 * Short.MAX_VALUE)) { return true; }
// else offset array can use Int
return false;
}
@@ -342,126 +349,20 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
return createPhoenixArray(bytes, offset, length, sortOrder, baseType, maxLength, desiredDataType);
}
- public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index,
- Expression arrayExpr, PDataType pDataType, Integer maxLen) {
- if (!arrayExpr.evaluate(tuple, ptr)) {
- return false;
- } else if (ptr.getLength() == 0) { return true; }
-
- // Given a ptr to the entire array, set ptr to point to a particular element within that array
- // given the type of an array element (see comments in PDataTypeForArray)
- positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
- return true;
- }
-
- public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
- Integer byteSize) {
- byte[] bytes = ptr.get();
- int initPos = ptr.getOffset();
- if (!baseDataType.isFixedWidth()) {
- int noOfElements = Bytes.toInt(bytes,
- (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
- boolean useShort = true;
- if (noOfElements < 0) {
- noOfElements = -noOfElements;
- useShort = false;
- }
- if (arrayIndex >= noOfElements) {
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
- return;
- }
-
- int indexOffset = Bytes.toInt(bytes,
- (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
- if (arrayIndex >= noOfElements) {
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
- } else {
- // Skip those many offsets as given in the arrayIndex
- // If suppose there are 5 elements in the array and the arrayIndex = 3
- // This means we need to read the 4th element of the array
- // So inorder to know the length of the 4th element we will read the offset of 4th element and the
- // offset of 5th element.
- // Subtracting the offset of 5th element and 4th element will give the length of 4th element
- // So we could just skip reading the other elements.
- int currOffset = getOffset(bytes, arrayIndex, useShort, indexOffset);
- int elementLength = 0;
- if (arrayIndex == (noOfElements - 1)) {
- elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
- - (currOffset + initPos) - 3;
- } else {
- elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
- arrayIndex + 1, useShort, indexOffset) - currOffset - 1;
- }
- ptr.set(bytes, currOffset + initPos, elementLength);
- }
- } else {
- int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
- int offset = arrayIndex * elemByteSize;
- if (offset >= ptr.getLength()) {
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
- } else {
- ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
- }
- }
- }
-
- public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
- Integer byteSize, int offset, int length, int noOfElements, boolean first) {
- byte[] bytes = ptr.get();
- if (!baseDataType.isFixedWidth()) {
- int indexOffset = Bytes.toInt(bytes, (offset + length - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
- + offset;
- boolean useShort = true;
- if (first) {
- int count = Bytes.toInt(bytes,
- (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
- if (count < 0) {
- count = -count;
- useShort = false;
- }
- }
- if (arrayIndex >= noOfElements) {
- return;
- } else {
- // Skip those many offsets as given in the arrayIndex
- // If suppose there are 5 elements in the array and the arrayIndex = 3
- // This means we need to read the 4th element of the array
- // So inorder to know the length of the 4th element we will read the offset of 4th element and the
- // offset of 5th element.
- // Subtracting the offset of 5th element and 4th element will give the length of 4th element
- // So we could just skip reading the other elements.
- int currOffset = getOffset(bytes, arrayIndex, useShort, indexOffset);
- int elementLength = 0;
- if (arrayIndex == (noOfElements - 1)) {
- elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
- - (currOffset + offset) - 3;
- } else {
- elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
- arrayIndex + 1, useShort, indexOffset) - currOffset - 1;
- }
- ptr.set(bytes, currOffset + offset, elementLength);
- }
- } else {
- int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
- offset += arrayIndex * elemByteSize;
- if (offset >= offset + length) {
- return;
- } else {
- ptr.set(bytes, offset, elemByteSize);
- }
- }
+ static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
+ return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion));
}
- private static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset) {
- int offset;
+ static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
+ int offset;
if (useShort) {
offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex);
- return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + Short.MAX_VALUE;
+ return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? 0 : Short.MAX_VALUE);
} else {
offset = indexOffset + (Bytes.SIZEOF_INT * arrayIndex);
return Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
}
- }
+ }
private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, boolean useShort, int indexOffset) {
int offset;
@@ -484,58 +385,18 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
}
/**
- * creates array bytes
+ * creates array bytes using the SORTABLE_SERIALIZATION_VERSION format
* @param rowKeyOrderOptimizable TODO
*/
private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
PhoenixArray array, int noOfElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
- try {
- if (!baseType.isFixedWidth()) {
- int[] offsetPos = new int[noOfElements];
- int nulls = 0;
- for (int i = 0; i < noOfElements; i++) {
- byte[] bytes = array.toBytes(i);
- if (bytes.length == 0) {
- offsetPos[i] = byteStream.size();
- nulls++;
- } else {
- nulls = serializeNulls(oStream, nulls);
- offsetPos[i] = byteStream.size();
- if (sortOrder == SortOrder.DESC) {
- SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
- }
- oStream.write(bytes, 0, bytes.length);
- oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
- }
- }
- // Double seperator byte to show end of the non null array
- writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable);
- noOfElements = PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
- offsetPos[offsetPos.length - 1], offsetPos);
- serializeHeaderInfoIntoStream(oStream, noOfElements);
- } else {
- for (int i = 0; i < noOfElements; i++) {
- byte[] bytes = array.toBytes(i);
- int length = bytes.length;
- if (sortOrder == SortOrder.DESC) {
- SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
- }
- oStream.write(bytes, 0, length);
- }
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(byteStream.getBuffer(), 0, byteStream.size());
- return ByteUtil.copyKeyBytesIfNecessary(ptr);
- } catch (IOException e) {
- try {
- byteStream.close();
- oStream.close();
- } catch (IOException ioe) {
-
- }
+ PArrayDataTypeEncoder builder =
+ new PArrayDataTypeEncoder(byteStream, oStream, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
+ for (int i = 0; i < noOfElements; i++) {
+ byte[] bytes = array.toBytes(i);
+ builder.appendValue(bytes);
}
- // This should not happen
- return null;
+ return builder.encode();
}
public static boolean appendItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes,
@@ -557,7 +418,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
byte[] newArray;
if (!baseType.isFixedWidth()) {
-
+ byte serializationVersion = arrayBytes[offset + length - Bytes.SIZEOF_BYTE];
int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
- Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -612,7 +473,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
int off = newOffsetArrayPosition;
for (int arrayIndex = 0; arrayIndex < Math.abs(arrayLength) - 1; arrayIndex++) {
Bytes.putInt(newArray, off,
- getOffset(arrayBytes, arrayIndex, true, offsetArrayPosition + offset));
+ getOffset(arrayBytes, arrayIndex, true, offsetArrayPosition + offset, serializationVersion));
off += Bytes.SIZEOF_INT;
}
@@ -659,6 +520,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
byte[] newArray;
if (!baseType.isFixedWidth()) {
+ byte serializationVersion = arrayBytes[offset + length - Bytes.SIZEOF_BYTE];
int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
- Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -668,7 +530,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
// checks whether offset array consists of shorts or integers
boolean useInt = offsetArrayLength / arrayLength == Bytes.SIZEOF_INT;
boolean convertToInt = false;
- int endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset)
+ int endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset, serializationVersion)
+ elementLength + Bytes.SIZEOF_BYTE;
int newOffsetArrayPosition;
int lengthIncrease;
@@ -679,7 +541,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
int nulls = 1;
// counts the number of nulls which are already at the beginning of the array
for (int index = 0; index < arrayLength; index++) {
- int currOffset = getOffset(arrayBytes, index, !useInt, offsetArrayPosition + offset);
+ int currOffset = getOffset(arrayBytes, index, !useInt, offsetArrayPosition + offset, serializationVersion);
if (arrayBytes[offset + currOffset] == QueryConstants.SEPARATOR_BYTE) {
nulls++;
} else {
@@ -709,7 +571,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
// ex: initial array - 0 45(inverted) 65 0 66 0 0 0 after prepending null - 0 46(inverted) 65 0 66 0 0 0
lengthIncrease = nRemainingNulls == 1 ? (nMultiplesOver255 == 0 ? 2 * Bytes.SIZEOF_BYTE
: Bytes.SIZEOF_BYTE) : 0;
- endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset)
+ endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset, serializationVersion)
+ lengthIncrease;
if (!useInt) {
if (PArrayDataType.useShortForOffsetArray(endElementPosition)) {
@@ -785,8 +647,9 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
currentPosition += offsetArrayElementSize;
boolean nullsAtBeginning = true;
+ byte serializationVersion = arrayBytes[offset + length - Bytes.SIZEOF_BYTE];
for (int arrayIndex = 0; arrayIndex < arrayLength - 1; arrayIndex++) {
- int oldOffset = getOffset(arrayBytes, arrayIndex, useShortPrevious, offsetArrayPosition + offset);
+ int oldOffset = getOffset(arrayBytes, arrayIndex, useShortPrevious, offsetArrayPosition + offset, serializationVersion);
if (arrayBytes[offset + oldOffset] == QueryConstants.SEPARATOR_BYTE && nullsAtBeginning) {
if (useShortNew) {
Bytes.putShort(newArray, currentPosition, (short)(oldOffset - Short.MAX_VALUE));
@@ -820,6 +683,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
byte[] newArray;
if (!baseType.isFixedWidth()) {
+ byte serializationVersion1 = array1Bytes[array1BytesOffset + array1BytesLength - Bytes.SIZEOF_BYTE];
int offsetArrayPositionArray1 = Bytes.toInt(array1Bytes, array1BytesOffset + array1BytesLength
- Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
int offsetArrayPositionArray2 = Bytes.toInt(array2Bytes, array2BytesOffset + array2BytesLength
@@ -837,7 +701,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
boolean useIntNewArray = false;
// count nulls at the end of array 1
for (int index = actualLengthOfArray1 - 1; index > -1; index--) {
- int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset + offsetArrayPositionArray1);
+ int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset + offsetArrayPositionArray1, serializationVersion1);
if (array1Bytes[array1BytesOffset + offset] == QueryConstants.SEPARATOR_BYTE || array1Bytes[array1BytesOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) {
nullsAtTheEndOfArray1++;
} else {
@@ -847,8 +711,9 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
// count nulls at the beginning of the array 2
int array2FirstNonNullElementOffset = 0;
int array2FirstNonNullIndex = 0;
+ byte serializationVersion2 = array2Bytes[array2BytesOffset + array2BytesLength - Bytes.SIZEOF_BYTE];
for (int index = 0; index < actualLengthOfArray2; index++) {
- int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
+ int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2, serializationVersion2);
if (array2Bytes[array2BytesOffset + offset] == QueryConstants.SEPARATOR_BYTE) {
nullsAtTheBeginningOfArray2++;
} else {
@@ -870,7 +735,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
int newOffsetArrayPosition = offsetArrayPositionArray1 + offsetArrayPositionArray2 + lengthIncreaseForNulls
- 2 * Bytes.SIZEOF_BYTE;
int endElementPositionOfArray2 = getOffset(array2Bytes, actualLengthOfArray2 - 1, !useIntArray2,
- array2BytesOffset + offsetArrayPositionArray2);
+ array2BytesOffset + offsetArrayPositionArray2, serializationVersion2);
int newEndElementPosition = lengthIncreaseForNulls + endElementPositionOfArray2 + offsetArrayPositionArray1
- 2 * Bytes.SIZEOF_BYTE;
// Creates a byte array to store the concatenated array
@@ -902,14 +767,14 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
// offsets for the elements from array 1. Simply copied.
for (int index = 0; index < actualLengthOfArray1; index++) {
int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset
- + offsetArrayPositionArray1);
+ + offsetArrayPositionArray1, serializationVersion1);
Bytes.putInt(newArray, currentPosition, offset);
currentPosition += Bytes.SIZEOF_INT;
}
// offsets for nulls in the middle
for (int index = 0; index < array2FirstNonNullIndex; index++) {
int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
- + offsetArrayPositionArray2);
+ + offsetArrayPositionArray2, serializationVersion2);
Bytes.putInt(newArray, currentPosition, offset + array2StartingPosition);
currentPosition += Bytes.SIZEOF_INT;
}
@@ -918,7 +783,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
+ (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
for (int index = array2FirstNonNullIndex; index < actualLengthOfArray2; index++) {
int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
- + offsetArrayPositionArray2);
+ + offsetArrayPositionArray2, serializationVersion2);
Bytes.putInt(newArray, currentPosition, offset - array2FirstNonNullElementOffset
+ part2NonNullStartingPosition);
currentPosition += Bytes.SIZEOF_INT;
@@ -927,14 +792,14 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
// offsets for the elements from array 1. Simply copied.
for (int index = 0; index < actualLengthOfArray1; index++) {
int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset
- + offsetArrayPositionArray1);
+ + offsetArrayPositionArray1, serializationVersion1);
Bytes.putShort(newArray, currentPosition, (short)(offset - Short.MAX_VALUE));
currentPosition += Bytes.SIZEOF_SHORT;
}
// offsets for nulls in the middle
for (int index = 0; index < array2FirstNonNullIndex; index++) {
int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
- + offsetArrayPositionArray2);
+ + offsetArrayPositionArray2, serializationVersion2);
Bytes.putShort(newArray, currentPosition,
(short)(offset + array2StartingPosition - Short.MAX_VALUE));
currentPosition += Bytes.SIZEOF_SHORT;
@@ -944,7 +809,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
+ (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
for (int index = array2FirstNonNullIndex; index < actualLengthOfArray2; index++) {
int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
- + offsetArrayPositionArray2);
+ + offsetArrayPositionArray2, serializationVersion2);
Bytes.putShort(newArray, currentPosition, (short)(offset - array2FirstNonNullElementOffset
+ part2NonNullStartingPosition - Short.MAX_VALUE));
currentPosition += Bytes.SIZEOF_SHORT;
@@ -1013,13 +878,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
ptr.set(PVarcharArray.INSTANCE.toBytes(phoenixArray, PVarchar.INSTANCE, sortOrder));
return true;
}
-
- public static int serailizeOffsetArrayIntoStream(DataOutputStream oStream, TrustedByteArrayOutputStream byteStream,
- int noOfElements, int maxOffset, int[] offsetPos) throws IOException {
+
+ public static int serializeOffsetArrayIntoStream(DataOutputStream oStream, TrustedByteArrayOutputStream byteStream,
+ int noOfElements, int maxOffset, int[] offsetPos, byte serializationVersion) throws IOException {
int offsetPosition = (byteStream.size());
byte[] offsetArr = null;
boolean useInt = true;
- if (PArrayDataType.useShortForOffsetArray(maxOffset)) {
+ if (PArrayDataType.useShortForOffsetArray(maxOffset, serializationVersion)) {
offsetArr = new byte[PArrayDataType.initOffsetArray(noOfElements, Bytes.SIZEOF_SHORT)];
useInt = false;
} else {
@@ -1034,7 +899,8 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
}
} else {
for (int pos : offsetPos) {
- Bytes.putShort(offsetArr, off, (short)(pos - Short.MAX_VALUE));
+ short val = serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? (short)pos : (short)(pos - Short.MAX_VALUE);
+ Bytes.putShort(offsetArr, off, val);
off += Bytes.SIZEOF_SHORT;
}
}
@@ -1043,18 +909,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
return noOfElements;
}
- public static void serializeHeaderInfoIntoBuffer(ByteBuffer buffer, int noOfElements) {
- // No of elements
- buffer.putInt(noOfElements);
- // Version of the array
- buffer.put(ARRAY_SERIALIZATION_VERSION);
- }
-
- public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, int noOfElements) throws IOException {
+ public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, int noOfElements, byte serializationVersion) throws IOException {
// No of elements
oStream.writeInt(noOfElements);
// Version of the array
- oStream.write(ARRAY_SERIALIZATION_VERSION);
+ oStream.write(serializationVersion);
}
public static int initOffsetArray(int noOfElements, int baseSize) {
@@ -1228,91 +1087,4 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
buf.append(']');
return buf.toString();
}
-
- // FIXME: remove this duplicate code
- static public class PArrayDataTypeBytesArrayBuilder<T> {
- static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
-
- private PDataType baseType;
- private SortOrder sortOrder;
- private List<Integer> offsetPos;
- private TrustedByteArrayOutputStream byteStream;
- private DataOutputStream oStream;
- private int nulls;
-
- public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder sortOrder) {
- this.baseType = baseType;
- this.sortOrder = sortOrder;
- offsetPos = new LinkedList<Integer>();
- byteStream = new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE);
- oStream = new DataOutputStream(byteStream);
- nulls = 0;
- }
-
- private void close() {
- try {
- if (byteStream != null) byteStream.close();
- if (oStream != null) oStream.close();
- byteStream = null;
- oStream = null;
- } catch (IOException ioe) {}
- }
-
- public boolean appendElem(byte[] bytes) {
- return appendElem(bytes, 0, bytes.length);
- }
-
- public boolean appendElem(byte[] bytes, int offset, int len) {
- if (oStream == null || byteStream == null) return false;
- try {
- if (!baseType.isFixedWidth()) {
- if (len == 0) {
- offsetPos.add(byteStream.size());
- nulls++;
- } else {
- nulls = serializeNulls(oStream, nulls);
- offsetPos.add(byteStream.size());
- if (sortOrder == SortOrder.DESC) {
- SortOrder.invert(bytes, offset, bytes, offset, len);
- offset = 0;
- }
- oStream.write(bytes, offset, len);
- oStream.write(getSeparatorByte(true, sortOrder));
- }
- } else {
- if (sortOrder == SortOrder.DESC) {
- SortOrder.invert(bytes, offset, bytes, offset, len);
- offset = 0;
- }
- oStream.write(bytes, offset, len);
- }
- return true;
- } catch (IOException e) {}
- return false;
- }
-
- public byte[] getBytesAndClose(SortOrder sortOrder) {
- try {
- if (!baseType.isFixedWidth()) {
- int noOfElements = offsetPos.size();
- int[] offsetPosArray = new int[noOfElements];
- int index = 0;
- for (Integer i : offsetPos) {
- offsetPosArray[index] = i;
- ++index;
- }
- PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, sortOrder);
- noOfElements = PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
- offsetPosArray[offsetPosArray.length - 1], offsetPosArray);
- serializeHeaderInfoIntoStream(oStream, noOfElements);
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(byteStream.getBuffer(), 0, byteStream.size());
- return ByteUtil.copyKeyBytesIfNecessary(ptr);
- } catch (IOException e) {} finally {
- close();
- }
- return null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
new file mode 100644
index 0000000..7a6ea91
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnValueDecoder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+public class PArrayDataTypeDecoder implements ColumnValueDecoder {
+
+ @Override
+ public boolean decode(ImmutableBytesWritable ptr, int index) {
+ return PArrayDataTypeDecoder.positionAtArrayElement(ptr, index, PVarbinary.INSTANCE, null);
+ }
+
+ public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index,
+ Expression arrayExpr, PDataType pDataType, Integer maxLen) {
+ if (!arrayExpr.evaluate(tuple, ptr)) {
+ return false;
+ } else if (ptr.getLength() == 0) { return true; }
+
+ // Given a ptr to the entire array, set ptr to point to a particular element within that array
+ // given the type of an array element (see comments in PDataTypeForArray)
+ return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
+ }
+
+ public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
+ Integer byteSize) {
+ byte[] bytes = ptr.get();
+ int initPos = ptr.getOffset();
+ if (!baseDataType.isFixedWidth()) {
+ byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE];
+ int noOfElements = Bytes.toInt(bytes,
+ (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
+ boolean useShort = true;
+ if (noOfElements < 0) {
+ noOfElements = -noOfElements;
+ useShort = false;
+ }
+ if (arrayIndex >= noOfElements) {
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ return false;
+ }
+
+ int indexOffset = Bytes.toInt(bytes,
+ (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
+ // Skip those many offsets as given in the arrayIndex
+ // If suppose there are 5 elements in the array and the arrayIndex = 3
+ // This means we need to read the 4th element of the array
+ // So inorder to know the length of the 4th element we will read the offset of 4th element and the
+ // offset of 5th element.
+ // Subtracting the offset of 5th element and 4th element will give the length of 4th element
+ // So we could just skip reading the other elements.
+ int currOffset = PArrayDataType.getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion);
+ if (currOffset<0) {
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ return false;
+ }
+ int elementLength = 0;
+ if (arrayIndex == (noOfElements - 1)) {
+ int separatorBytes = serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
+ elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
+ - (currOffset + initPos) - separatorBytes;
+ } else {
+ int separatorByte = serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
+ elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : PArrayDataType.getOffset(bytes,
+ arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - separatorByte;
+ }
+ ptr.set(bytes, currOffset + initPos, elementLength);
+ } else {
+ int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
+ int offset = arrayIndex * elemByteSize;
+ if (offset >= ptr.getLength()) {
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ } else {
+ ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
new file mode 100644
index 0000000..bb293bb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.ColumnValueEncoder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+public class PArrayDataTypeEncoder implements ColumnValueEncoder {
+ static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
+
+ private PDataType baseType;
+ private SortOrder sortOrder;
+ private List<Integer> offsetPos;
+ private TrustedByteArrayOutputStream byteStream;
+ private DataOutputStream oStream;
+ private int nulls;
+ private byte serializationVersion;
+ private boolean rowKeyOrderOptimizable;
+
+ public PArrayDataTypeEncoder(PDataType baseType, SortOrder sortOrder) {
+ this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new LinkedList<Integer>(), baseType, sortOrder, true);
+ }
+
+ public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+ int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
+ this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
+ }
+
+ public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+ int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
+ this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+ }
+
+ public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream,
+ List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
+ this(byteStream, new DataOutputStream(byteStream), offsetPos, baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+ }
+
+ public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+ List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
+ this.baseType = baseType;
+ this.sortOrder = sortOrder;
+ this.offsetPos = offsetPos;
+ this.byteStream = byteStream;
+ this.oStream = oStream;
+ this.nulls = 0;
+ this.serializationVersion = serializationVersion;
+ this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
+ }
+
+ private void close() {
+ try {
+ if (byteStream != null) byteStream.close();
+ if (oStream != null) oStream.close();
+ byteStream = null;
+ oStream = null;
+ } catch (IOException ioe) {}
+ }
+
+ // used to represent the absence of a value
+ @Override
+ public void appendAbsentValue() {
+ if (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
+ offsetPos.add(-byteStream.size());
+ nulls++;
+ }
+ else {
+ throw new UnsupportedOperationException("Cannot represent an absent element");
+ }
+ }
+
+ public void appendValue(byte[] bytes) {
+ appendValue(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void appendValue(byte[] bytes, int offset, int len) {
+ try {
+ // track the offset position here from the size of the byteStream
+ if (!baseType.isFixedWidth()) {
+ // Any variable length array would follow the below order
+ // Every element would be seperated by a seperator byte '0'
+ // Null elements are counted and once a first non null element appears we
+ // write the count of the nulls prefixed with a seperator byte
+ // Trailing nulls are not taken into account
+ // The last non null element is followed by two seperator bytes
+ // For eg
+ // a, b, null, null, c, null would be
+ // 65 0 66 0 0 2 67 0 0 0
+ // a null null null b c null d would be
+ // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
+ if (len == 0) {
+ offsetPos.add(byteStream.size());
+ nulls++;
+ } else {
+ nulls = PArrayDataType.serializeNulls(oStream, nulls);
+ offsetPos.add(byteStream.size());
+ if (sortOrder == SortOrder.DESC) {
+ SortOrder.invert(bytes, offset, bytes, offset, len);
+ offset = 0;
+ }
+ oStream.write(bytes, offset, len);
+ if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+ oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
+ }
+ }
+ } else {
+ // No nulls for fixed length
+ if (sortOrder == SortOrder.DESC) {
+ SortOrder.invert(bytes, offset, bytes, offset, len);
+ offset = 0;
+ }
+ oStream.write(bytes, offset, len);
+ }
+ } catch (IOException e) {}
+ }
+
+ @Override
+ public byte[] encode() {
+ try {
+ if (!baseType.isFixedWidth()) {
+ int noOfElements = offsetPos.size();
+ int[] offsetPosArray = new int[noOfElements];
+ int index = 0;
+ for (Integer i : offsetPos) {
+ offsetPosArray[index] = i;
+ ++index;
+ }
+ if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+ // Double seperator byte to show end of the non null array
+ PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable);
+ }
+ noOfElements = PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+ offsetPosArray[offsetPosArray.length - 1], offsetPosArray, serializationVersion);
+ PArrayDataType.serializeHeaderInfoIntoStream(oStream, noOfElements, serializationVersion);
+ }
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(byteStream.getBuffer(), 0, byteStream.size());
+ return ByteUtil.copyKeyBytesIfNecessary(ptr);
+ } catch (IOException e) {} finally {
+ close();
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
new file mode 100644
index 0000000..59e99fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.DelegateExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class EncodedColumnsUtil {
+
+ public static boolean usesEncodedColumnNames(PTable table) {
+ return usesEncodedColumnNames(table.getEncodingScheme());
+ }
+
+ public static boolean usesEncodedColumnNames(QualifierEncodingScheme encodingScheme) {
+ return encodingScheme != null && encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+ }
+
+ public static void setColumns(PColumn column, PTable table, Scan scan) {
+ if (table.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ // if a table storage scheme is COLUMNS_STORED_IN_SINGLE_CELL set then all columns of a column family are stored in a single cell
+ // (with the qualifier name being same as the family name), just project the column family here
+ // so that we can calculate estimatedByteSize correctly in ProjectionCompiler
+ scan.addFamily(column.getFamilyName().getBytes());
+ }
+ else {
+ if (column.getColumnQualifierBytes() != null) {
+ scan.addColumn(column.getFamilyName().getBytes(), column.getColumnQualifierBytes());
+ }
+ }
+ }
+
+ public static QualifierEncodingScheme getQualifierEncodingScheme(Scan s) {
+ // null check for backward compatibility
+ return s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME) == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]);
+ }
+
+ public static ImmutableStorageScheme getImmutableStorageScheme(Scan s) {
+ // null check for backward compatibility
+ return s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME) == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]);
+ }
+
+ /**
+ * @return pair of byte arrays. The first part of the pair is the empty key value's column qualifier, and the second
+ * part is the value to use for it.
+ */
+ public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) {
+ return usesEncodedColumnNames(table) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+ QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ }
+
+ /**
+ * @return pair of byte arrays. The first part of the pair is the empty key value's column qualifier, and the second
+ * part is the value to use for it.
+ */
+ public static Pair<byte[], byte[]> getEmptyKeyValueInfo(boolean usesEncodedColumnNames) {
+ return usesEncodedColumnNames ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+ QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ }
+
+ /**
+ * @return pair of byte arrays. The first part of the pair is the empty key value's column qualifier, and the second
+ * part is the value to use for it.
+ */
+ public static Pair<byte[], byte[]> getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) {
+ return usesEncodedColumnNames(encodingScheme) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+ QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ }
+
+ public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan scan) {
+ Integer minQ = null, maxQ = null;
+ byte[] minQualifier = scan.getAttribute(BaseScannerRegionObserver.MIN_QUALIFIER);
+ if (minQualifier != null) {
+ minQ = Bytes.toInt(minQualifier);
+ }
+ byte[] maxQualifier = scan.getAttribute(BaseScannerRegionObserver.MAX_QUALIFIER);
+ if (maxQualifier != null) {
+ maxQ = Bytes.toInt(maxQualifier);
+ }
+ if (minQualifier == null) {
+ return null;
+ }
+ return new Pair<>(minQ, maxQ);
+ }
+
+ public static boolean setQualifierRanges(PTable table) {
+ return table.getImmutableStorageScheme() != null
+ && table.getImmutableStorageScheme() == ImmutableStorageScheme.ONE_CELL_PER_COLUMN
+ && usesEncodedColumnNames(table) && !table.isTransactional()
+ && !ScanUtil.hasDynamicColumns(table);
+ }
+
+ public static boolean useQualifierAsIndex(Pair<Integer, Integer> minMaxQualifiers) {
+ return minMaxQualifiers != null;
+ }
+
+ public static Map<String, Pair<Integer, Integer>> getFamilyQualifierRanges(PTable table) {
+ checkNotNull(table);
+ QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+ Preconditions.checkArgument(encodingScheme != NON_ENCODED_QUALIFIERS);
+ if (table.getEncodedCQCounter() != null) {
+ Map<String, Integer> values = table.getEncodedCQCounter().values();
+ Map<String, Pair<Integer, Integer>> toReturn = Maps.newHashMapWithExpectedSize(values.size());
+ for (Entry<String, Integer> e : values.entrySet()) {
+ Integer lowerBound = QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+ Integer upperBound = e.getValue() - 1;
+ if (lowerBound > upperBound) {
+ lowerBound = upperBound;
+ }
+ toReturn.put(e.getKey(), new Pair<>(lowerBound, upperBound));
+ }
+ return toReturn;
+ }
+ return Collections.emptyMap();
+ }
+
+ public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, PTable table, boolean isPk) {
+ QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+ return getColumnQualifierBytes(columnName, numberBasedQualifier, encodingScheme, isPk);
+ }
+
+ public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, QualifierEncodingScheme encodingScheme, boolean isPk) {
+ if (isPk) {
+ return null;
+ }
+ if (encodingScheme == null || encodingScheme == NON_ENCODED_QUALIFIERS) {
+ return Bytes.toBytes(columnName);
+ }
+ return encodingScheme.encode(numberBasedQualifier);
+ }
+
+ public static Expression[] createColumnExpressionArray(int maxEncodedColumnQualifier) {
+ // reserve the first position and offset maxEncodedColumnQualifier by ENCODED_CQ_COUNTER_INITIAL_VALUE (which is the minimum encoded column qualifier)
+ int numElements = maxEncodedColumnQualifier - QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE + 2;
+ Expression[] colValues = new Expression[numElements];
+ Arrays.fill(colValues, new DelegateExpression(LiteralExpression.newConstant(null)) {
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ return false;
+ }
+ });
+ // 0 is a reserved position, set it to a non-null value so that we can represent absence of a value using a negative offset
+ colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ return colValues;
+ }
+
+ public static boolean isReservedColumnQualifier(int number) {
+ if (number < 0) {
+ throw new IllegalArgumentException("Negative column qualifier" + number + " not allowed ");
+ }
+ return number < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+ }
+}