You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:35 UTC
[34/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
new file mode 100644
index 0000000..8946c9f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.*;
+import org.apache.phoenix.schema.TableRef;
+
+public class DegenerateQueryPlan extends BasicQueryPlan {
+
+ public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
+ super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);
+ context.setScanRanges(ScanRanges.NOTHING);
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/MutationState.java b/src/main/java/org/apache/phoenix/execute/MutationState.java
new file mode 100644
index 0000000..3fa26e4
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ *
+ * Tracks the uncommitted state
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MutationState implements SQLCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
+
+ private PhoenixConnection connection;
+ private final long maxSize;
+ private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+ private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+ private final long sizeOffset;
+ private int numEntries = 0;
+
+ public MutationState(int maxSize, PhoenixConnection connection) {
+ this(maxSize,connection,0);
+ }
+
+ public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.sizeOffset = sizeOffset;
+ }
+
+ public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.mutations.put(table, mutations);
+ this.sizeOffset = sizeOffset;
+ this.numEntries = mutations.size();
+ throwIfTooBig();
+ }
+
+ private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.sizeOffset = sizeOffset;
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
+ numEntries += entry.getValue().size();
+ this.mutations.put(entry.getKey(), entry.getValue());
+ }
+ throwIfTooBig();
+ }
+
+ private void throwIfTooBig() {
+ if (numEntries > maxSize) {
+ // TODO: throw SQLException ?
+ throw new IllegalArgumentException("MutationState size of " + numEntries + " is bigger than max allowed size of " + maxSize);
+ }
+ }
+
+ public long getUpdateCount() {
+ return sizeOffset + numEntries;
+ }
+ /**
+ * Combine a newer mutation with this one, where in the event of overlaps,
+ * the newer one will take precedence.
+ * @param newMutation the newer mutation
+ */
+ public void join(MutationState newMutation) {
+ if (this == newMutation) { // Doesn't make sense
+ return;
+ }
+ // Merge newMutation with this one, keeping state from newMutation for any overlaps
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
+ // Replace existing entries for the table with new entries
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(entry.getKey(), entry.getValue());
+ if (existingRows != null) { // Rows for that table already exist
+ // Loop through new rows and replace existing with new
+ for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+ // Replace existing row with new row
+ Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+ if (existingValues != null) {
+ Map<PColumn,byte[]> newRow = rowEntry.getValue();
+ // if new row is null, it means delete, and we don't need to merge it with existing row.
+ if (newRow != null) {
+ // Replace existing column values with new column values
+ for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
+ existingValues.put(valueEntry.getKey(), valueEntry.getValue());
+ }
+ // Now that the existing row has been merged with the new row, replace it back
+ // again (since it was replaced with the new one above).
+ existingRows.put(rowEntry.getKey(), existingValues);
+ }
+ } else {
+ numEntries++;
+ }
+ }
+ // Put the existing one back now that it's merged
+ this.mutations.put(entry.getKey(), existingRows);
+ } else {
+ numEntries += entry.getValue().size();
+ }
+ }
+ throwIfTooBig();
+ }
+
+ private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
+ final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
+ Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
+ ImmutableBytesPtr key = rowEntry.getKey();
+ PRow row = tableRef.getTable().newRow(timestamp, key);
+ if (rowEntry.getValue() == null) { // means delete
+ row.delete();
+ } else {
+ for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
+ row.setValue(valueEntry.getKey(), valueEntry.getValue());
+ }
+ }
+ mutations.addAll(row.toRowMutations());
+ }
+ final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
+ (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ?
+ IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) :
+ Iterators.<PTable>emptyIterator();
+ return new Iterator<Pair<byte[],List<Mutation>>>() {
+ boolean isFirst = true;
+
+ @Override
+ public boolean hasNext() {
+ return isFirst || indexes.hasNext();
+ }
+
+ @Override
+ public Pair<byte[], List<Mutation>> next() {
+ if (isFirst) {
+ isFirst = false;
+ return new Pair<byte[],List<Mutation>>(tableRef.getTable().getName().getBytes(),mutations);
+ }
+ PTable index = indexes.next();
+ List<Mutation> indexMutations;
+ try {
+ indexMutations = IndexUtil.generateIndexData(tableRef.getTable(), index, mutations, tempPtr);
+ } catch (SQLException e) {
+ throw new IllegalDataException(e);
+ }
+ return new Pair<byte[],List<Mutation>>(index.getName().getBytes(),indexMutations);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ /**
+ * Get the unsorted list of HBase mutations for the tables with uncommitted data.
+ * @return list of HBase mutations for uncommitted data.
+ */
+ public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
+ return toMutations(false);
+ }
+
+ public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
+ final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+ if (!iterator.hasNext()) {
+ return Iterators.emptyIterator();
+ }
+ Long scn = connection.getSCN();
+ final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ return new Iterator<Pair<byte[],List<Mutation>>>() {
+ private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
+ private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
+
+ private Iterator<Pair<byte[],List<Mutation>>> init() {
+ return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext() || iterator.hasNext();
+ }
+
+ @Override
+ public Pair<byte[], List<Mutation>> next() {
+ if (!innerIterator.hasNext()) {
+ current = iterator.next();
+ }
+ return innerIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ /**
+ * Validates that the meta data is still valid based on the current server time
+ * and returns the server time to use for the upsert for each table.
+ * @param connection
+ * @return the server time to use for the upsert
+ * @throws SQLException if the table or any columns no longer exist
+ */
+ private long[] validate() throws SQLException {
+ int i = 0;
+ Long scn = connection.getSCN();
+ MetaDataClient client = new MetaDataClient(connection);
+ long[] timeStamps = new long[this.mutations.size()];
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
+ TableRef tableRef = entry.getKey();
+ long serverTimeStamp = tableRef.getTimeStamp();
+ PTable table = tableRef.getTable();
+ if (!connection.getAutoCommit()) {
+ serverTimeStamp = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+ if (serverTimeStamp < 0) {
+ serverTimeStamp *= -1;
+ // TODO: use bitset?
+ PColumn[] columns = new PColumn[table.getColumns().size()];
+ for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+ Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+ if (valueEntry != null) {
+ for (PColumn column : valueEntry.keySet()) {
+ columns[column.getPosition()] = column;
+ }
+ }
+ }
+ table = connection.getPMetaData().getTable(tableRef.getTable().getName().getString());
+ for (PColumn column : columns) {
+ if (column != null) {
+ table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+ }
+ }
+ }
+ }
+ timeStamps[i++] = scn == null ? serverTimeStamp : scn;
+ }
+ return timeStamps;
+ }
+
+ private static void logMutationSize(HTableInterface htable, List<Mutation> mutations) {
+ long byteSize = 0;
+ int keyValueCount = 0;
+ for (Mutation mutation : mutations) {
+ if (mutation.getFamilyMap() != null) { // Not a Delete of the row
+ for (Entry<byte[], List<KeyValue>> entry : mutation.getFamilyMap().entrySet()) {
+ if (entry.getValue() != null) {
+ for (KeyValue kv : entry.getValue()) {
+ byteSize += kv.getBuffer().length;
+ keyValueCount++;
+ }
+ }
+ }
+ }
+ }
+ logger.debug("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes");
+ }
+
+ public void commit() throws SQLException {
+ int i = 0;
+ byte[] tenantId = connection.getTenantId();
+ long[] serverTimeStamps = validate();
+ Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+ List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+ while (iterator.hasNext()) {
+ Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
+ TableRef tableRef = entry.getKey();
+ PTable table = tableRef.getTable();
+ table.getIndexMaintainers(tempPtr);
+ boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+ boolean isDataTable = true;
+ long serverTimestamp = serverTimeStamps[i++];
+ Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+ while (mutationsIterator.hasNext()) {
+ Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+ byte[] htableName = pair.getFirst();
+ List<Mutation> mutations = pair.getSecond();
+
+ int retryCount = 0;
+ boolean shouldRetry = false;
+ do {
+ ServerCache cache = null;
+ if (hasIndexMaintainers && isDataTable) {
+ byte[] attribValue = null;
+ byte[] uuidValue;
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(mutations, tempPtr);
+ uuidValue = cache.getId();
+ // If we haven't retried yet, retry for this case only, as it's possible that
+ // a split will occur after we send the index metadata cache to all known
+ // region servers.
+ shouldRetry = true;
+ } else {
+ attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ // Either set the UUID to be able to access the index metadata from the cache
+ // or set the index metadata directly on the Mutation
+ for (Mutation mutation : mutations) {
+ if (tenantId != null) {
+ mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (attribValue != null) {
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ }
+ }
+ }
+
+ SQLException sqlE = null;
+ HTableInterface hTable = connection.getQueryServices().getTable(htableName);
+ try {
+ if (logger.isDebugEnabled()) logMutationSize(hTable, mutations);
+ long startTime = System.currentTimeMillis();
+ hTable.batch(mutations);
+ shouldRetry = false;
+ if (logger.isDebugEnabled()) logger.debug("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms");
+ committedList.add(entry);
+ } catch (Exception e) {
+ SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+ if (inferredE != null) {
+ if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+ // Swallow this exception once, as it's possible that we split after sending the index metadata
+ // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+ // If it fails again, we don't retry.
+ logger.warn("Swallowing exception and retrying after clearing meta cache on connection. " + inferredE);
+ connection.getQueryServices().clearTableRegionCache(htableName);
+ continue;
+ }
+ e = inferredE;
+ }
+ // Throw to client with both what was committed so far and what is left to be committed.
+ // That way, client can either undo what was done or try again with what was not done.
+ sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+ } finally {
+ try {
+ hTable.close();
+ } catch (IOException e) {
+ if (sqlE != null) {
+ sqlE.setNextException(ServerUtil.parseServerException(e));
+ } else {
+ sqlE = ServerUtil.parseServerException(e);
+ }
+ } finally {
+ try {
+ if (cache != null) {
+ cache.close();
+ }
+ } finally {
+ if (sqlE != null) {
+ throw sqlE;
+ }
+ }
+ }
+ }
+ } while (shouldRetry && retryCount++ < 1);
+ isDataTable = false;
+ }
+ numEntries -= entry.getValue().size();
+ iterator.remove(); // Remove batches as we process them
+ }
+ assert(numEntries==0);
+ assert(this.mutations.isEmpty());
+ }
+
+ public void rollback(PhoenixConnection connection) throws SQLException {
+ this.mutations.clear();
+ numEntries = 0;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/src/main/java/org/apache/phoenix/execute/ScanPlan.java
new file mode 100644
index 0000000..2913b80
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
+import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.query.WrappedScanner;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ *
+ * Query plan for a basic table scan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanPlan extends BasicQueryPlan {
+ private List<KeyRange> splits;
+
+ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
+ super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null, parallelIteratorFactory == null ? new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()) : parallelIteratorFactory);
+ if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
+ int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+ QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit == null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
+ }
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return splits;
+ }
+
+ @Override
+ protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+ // Set any scan attributes before creating the scanner, as it will be too late afterwards
+ context.getScan().setAttribute(ScanRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
+ ResultIterator scanner;
+ TableRef tableRef = this.getTableRef();
+ PTable table = tableRef.getTable();
+ boolean isSalted = table.getBucketNum() != null;
+ /* If no limit or topN, use parallel iterator so that we get results faster. Otherwise, if
+ * limit is provided, run query serially.
+ */
+ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+ ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, isOrdered ? null : limit, parallelIteratorFactory);
+ splits = iterators.getSplits();
+ if (isOrdered) {
+ scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
+ } else {
+ if (isSalted &&
+ (services.getProps().getBoolean(
+ QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB,
+ QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) ||
+ orderBy == OrderBy.ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order
+ scanner = new MergeSortRowKeyResultIterator(iterators, SaltingUtil.NUM_SALTING_BYTES);
+ } else {
+ scanner = new ConcatResultIterator(iterators);
+ }
+ if (limit != null) {
+ scanner = new LimitingResultIterator(scanner, limit);
+ }
+ }
+
+ return new WrappedScanner(scanner, getProjector());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AddExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AddExpression.java b/src/main/java/org/apache/phoenix/expression/AddExpression.java
new file mode 100644
index 0000000..fd906c5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AddExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ *
+ * Subtract expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class AddExpression extends BaseAddSubtractExpression {
+ public AddExpression() {
+ }
+
+ public AddExpression(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+ T t = visitor.visitLeave(this, l);
+ if (t == null) {
+ t = visitor.defaultReturn(this, l);
+ }
+ return t;
+ }
+
+ @Override
+ public String getOperatorString() {
+ return " + ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AndExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AndExpression.java b/src/main/java/org/apache/phoenix/expression/AndExpression.java
new file mode 100644
index 0000000..9a8e933
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AndExpression.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ *
+ * AND expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AndExpression extends AndOrExpression {
+ public AndExpression() {
+ }
+
+ public AndExpression(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ protected boolean getStopValue() {
+ return Boolean.FALSE;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("(");
+ for (int i = 0; i < children.size() - 1; i++) {
+ buf.append(children.get(i) + " AND ");
+ }
+ buf.append(children.get(children.size()-1));
+ buf.append(')');
+ return buf.toString();
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+ T t = visitor.visitLeave(this, l);
+ if (t == null) {
+ t = visitor.defaultReturn(this, l);
+ }
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AndOrExpression.java b/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
new file mode 100644
index 0000000..aebd63a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Abstract expression implementation for compound AND and OR expressions
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class AndOrExpression extends BaseCompoundExpression {
+ // Remember evaluation of child expression for partial evaluation
+ private BitSet partialEvalState;
+
+ public AndOrExpression() {
+ }
+
+ public AndOrExpression(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + Boolean.valueOf(this.getStopValue()).hashCode();
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.BOOLEAN;
+ }
+
+ @Override
+ public void reset() {
+ if (partialEvalState == null) {
+ partialEvalState = new BitSet(children.size());
+ } else {
+ partialEvalState.clear();
+ }
+ super.reset();
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ boolean isNull = false;
+ boolean stopValue = getStopValue();
+ for (int i = 0; i < children.size(); i++) {
+ Expression child = children.get(i);
+ // If partial state is available, then use that to know we've already evaluated this
+ // child expression and do not need to do so again.
+ if (partialEvalState == null || !partialEvalState.get(i)) {
+ // Call through to child evaluate method matching parent call to allow child to optimize
+ // evaluate versus getValue code path.
+ if (child.evaluate(tuple, ptr)) {
+ // Short circuit if we see our stop value
+ if (Boolean.valueOf(stopValue).equals(PDataType.BOOLEAN.toObject(ptr, child.getDataType()))) {
+ return true;
+ } else if (partialEvalState != null) {
+ partialEvalState.set(i);
+ }
+ } else {
+ isNull = true;
+ }
+ }
+ }
+ if (isNull) {
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract boolean getStopValue();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java b/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
new file mode 100644
index 0000000..622d709
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+public abstract class ArithmeticExpression extends BaseCompoundExpression {
+
+ public ArithmeticExpression() {
+ }
+
+ public ArithmeticExpression(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("(");
+ for (int i = 0; i < children.size() - 1; i++) {
+ buf.append(children.get(i) + getOperatorString());
+ }
+ buf.append(children.get(children.size()-1));
+ buf.append(')');
+ return buf.toString();
+ }
+
+ abstract protected String getOperatorString();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java b/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
new file mode 100644
index 0000000..1b9e4e5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.schema.PDataType;
+
+
+abstract public class BaseAddSubtractExpression extends ArithmeticExpression {
+ public BaseAddSubtractExpression() {
+ }
+
+ public BaseAddSubtractExpression(List<Expression> children) {
+ super(children);
+ }
+
+ protected static Integer getPrecision(Integer lp, Integer rp, Integer ls, Integer rs) {
+ if (ls == null || rs == null) {
+ return PDataType.MAX_PRECISION;
+ }
+ int val = getScale(lp, rp, ls, rs) + Math.max(lp - ls, rp - rs) + 1;
+ return Math.min(PDataType.MAX_PRECISION, val);
+ }
+
+ protected static Integer getScale(Integer lp, Integer rp, Integer ls, Integer rs) {
+ // If we are adding a decimal with scale and precision to a decimal
+ // with no precision nor scale, the scale system does not apply.
+ if (ls == null || rs == null) {
+ return null;
+ }
+ int val = Math.max(ls, rs);
+ return Math.min(PDataType.MAX_PRECISION, val);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java b/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
new file mode 100644
index 0000000..814dcaf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+public abstract class BaseCompoundExpression extends BaseExpression {
+ protected List<Expression> children;
+ private boolean isNullable;
+
+ public BaseCompoundExpression() {
+ }
+
+ public BaseCompoundExpression(List<Expression> children) {
+ this.children = ImmutableList.copyOf(children);
+ for (int i = 0; i < children.size(); i++) {
+ Expression child = children.get(i);
+ if (child.isNullable()) {
+ isNullable = true;
+ }
+ }
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + children.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ BaseCompoundExpression other = (BaseCompoundExpression)obj;
+ if (!children.equals(other.children)) return false;
+ return true;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int len = WritableUtils.readVInt(input);
+ List<Expression>children = new ArrayList<Expression>(len);
+ for (int i = 0; i < len; i++) {
+ Expression child = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ child.readFields(input);
+ isNullable |= child.isNullable();
+ children.add(child);
+ }
+ this.children = ImmutableList.copyOf(children);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ WritableUtils.writeVInt(output, children.size());
+ for (int i = 0; i < children.size(); i++) {
+ Expression child = children.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(child).ordinal());
+ child.write(output);
+ }
+ }
+
+ @Override
+ public void reset() {
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).reset();
+ }
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+ T t = visitor.visitLeave(this, l);
+ if (t == null) {
+ t = visitor.defaultReturn(this, l);
+ }
+ return t;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName() + " [children=" + children + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java b/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
new file mode 100644
index 0000000..4f641fe
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
@@ -0,0 +1,5 @@
+package org.apache.phoenix.expression;
+
+public class BaseDecimalAddSubtractExpression {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseExpression.java b/src/main/java/org/apache/phoenix/expression/BaseExpression.java
new file mode 100644
index 0000000..3636edb
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseExpression.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+
+/**
+ *
+ * Base class for Expression hierarchy that provides common
+ * default implementations for most methods
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseExpression implements Expression {
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public Integer getByteSize() {
+ return getDataType().isFixedWidth() ? getDataType().getByteSize() : null;
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return null;
+ }
+
+ @Override
+ public Integer getScale() {
+ return null;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return null;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ protected final <T> List<T> acceptChildren(ExpressionVisitor<T> visitor, Iterator<Expression> iterator) {
+ if (iterator == null) {
+ iterator = visitor.defaultIterator(this);
+ }
+ List<T> l = Collections.emptyList();
+ while (iterator.hasNext()) {
+ Expression child = iterator.next();
+ T t = child.accept(visitor);
+ if (t != null) {
+ if (l.isEmpty()) {
+ l = new ArrayList<T>(getChildren().size());
+ }
+ l.add(t);
+ }
+ }
+ return l;
+ }
+
+ @Override
+ public boolean isConstant() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java b/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
new file mode 100644
index 0000000..1758438
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ *
+ * Base class for expressions which have a single child expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseSingleExpression extends BaseExpression {
+
+ protected List<Expression> children;
+
+ public BaseSingleExpression() {
+ }
+
+ public BaseSingleExpression(Expression expression) {
+ this.children = ImmutableList.of(expression);
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return children;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ expression.readFields(input);
+ children = ImmutableList.of(expression);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(children.get(0)).ordinal());
+ children.get(0).write(output);
+ }
+
+ @Override
+ public boolean isNullable() {
+ return children.get(0).isNullable();
+ }
+
+ @Override
+ public void reset() {
+ children.get(0).reset();
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, null);
+ if (l.isEmpty()) {
+ return visitor.defaultReturn(this, l);
+ }
+ return l.get(0);
+ }
+
+ public Expression getChild() {
+ return children.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java b/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
new file mode 100644
index 0000000..aaa4371
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+
+/**
+ *
+ * Grouping class for expression that have no expression children
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseTerminalExpression extends BaseExpression {
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CaseExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CaseExpression.java b/src/main/java/org/apache/phoenix/expression/CaseExpression.java
new file mode 100644
index 0000000..f36f6a7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CaseExpression.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * CASE/WHEN expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class CaseExpression extends BaseCompoundExpression {
+ private static final int FULLY_EVALUATE = -1;
+
+ private short evalIndex = FULLY_EVALUATE;
+ private boolean foundIndex;
+ private PDataType returnType;
+
+ public CaseExpression() {
+ }
+
+ private static List<Expression> coerceIfNecessary(List<Expression> children) throws SQLException {
+ boolean isChildTypeUnknown = false;
+ PDataType returnType = children.get(0).getDataType();
+ for (int i = 2; i < children.size(); i+=2) {
+ Expression child = children.get(i);
+ PDataType childType = child.getDataType();
+ if (childType == null) {
+ isChildTypeUnknown = true;
+ } else if (returnType == null) {
+ returnType = childType;
+ isChildTypeUnknown = true;
+ } else if (returnType == childType || childType.isCoercibleTo(returnType)) {
+ continue;
+ } else if (returnType.isCoercibleTo(childType)) {
+ returnType = childType;
+ } else {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CONVERT_TYPE)
+ .setMessage("Case expressions must have common type: " + returnType + " cannot be coerced to " + childType)
+ .build().buildException();
+ }
+ }
+ // If we found an "unknown" child type and the return type is a number
+ // make the return type be the most general number type of DECIMAL.
+ if (isChildTypeUnknown && returnType.isCoercibleTo(PDataType.DECIMAL)) {
+ returnType = PDataType.DECIMAL;
+ }
+ List<Expression> newChildren = children;
+ for (int i = 0; i < children.size(); i+=2) {
+ Expression child = children.get(i);
+ PDataType childType = child.getDataType();
+ if (childType != returnType) {
+ if (newChildren == children) {
+ newChildren = new ArrayList<Expression>(children);
+ }
+ newChildren.set(i, CoerceExpression.create(child, returnType));
+ }
+ }
+ return newChildren;
+ }
+ /**
+ * Construct CASE/WHEN expression
+ * @param expressions list of expressions in the form of:
+ * ((<result expression>, <boolean expression>)+, [<optional else result expression>])
+ * @throws SQLException if return type of case expressions do not match and cannot
+ * be coerced to a common type
+ */
+ public CaseExpression(List<Expression> expressions) throws SQLException {
+ super(coerceIfNecessary(expressions));
+ returnType = children.get(0).getDataType();
+ }
+
+ private boolean isPartiallyEvaluating() {
+ return evalIndex != FULLY_EVALUATE;
+ }
+
+ public boolean hasElse() {
+ return children.size() % 2 != 0;
+ }
+
+ @Override
+ public boolean isNullable() {
+ // If any expression is nullable or there's no else clause
+ // return true since null may be returned.
+ if (super.isNullable() || !hasElse()) {
+ return true;
+ }
+ return children.get(children.size()-1).isNullable();
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return returnType;
+ }
+
+ @Override
+ public void reset() {
+ foundIndex = false;
+ evalIndex = 0;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ this.returnType = PDataType.values()[WritableUtils.readVInt(input)];
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ WritableUtils.writeVInt(output, this.returnType.ordinal());
+ }
+
+ public int evaluateIndexOf(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (foundIndex) {
+ return evalIndex;
+ }
+ int size = children.size();
+ // If we're doing partial evaluation, start where we left off
+ for (int i = isPartiallyEvaluating() ? evalIndex : 0; i < size; i+=2) {
+ // Short circuit if we see our stop value
+ if (i+1 == size) {
+ return i;
+ }
+ // If we get null, we have to re-evaluate from that point (special case this in filter, like is null)
+ // We may only run this when we're done/have all values
+ boolean evaluated = children.get(i+1).evaluate(tuple, ptr);
+ if (evaluated && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(ptr))) {
+ if (isPartiallyEvaluating()) {
+ foundIndex = true;
+ }
+ return i;
+ }
+ if (isPartiallyEvaluating()) {
+ if (evaluated || tuple.isImmutable()) {
+ evalIndex+=2;
+ } else {
+ /*
+ * Return early here if incrementally evaluating and we don't
+ * have all the key values yet. We can't continue because we'd
+ * potentially be bypassing cases which we could later evaluate
+ * once we have more column values.
+ */
+ return -1;
+ }
+ }
+ }
+ // No conditions matched, return size to indicate that we were able
+ // to evaluate all cases, but didn't find any matches.
+ return size;
+ }
+
+ /**
+ * Only expression that currently uses the isPartial flag. The IS NULL
+ * expression will use it too. TODO: We could alternatively have a non interface
+ * method, like setIsPartial in which we set to false prior to calling
+ * evaluate.
+ */
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ int index = evaluateIndexOf(tuple, ptr);
+ if (index < 0) {
+ return false;
+ } else if (index == children.size()) {
+ ptr.set(PDataType.NULL_BYTES);
+ return true;
+ }
+ if (children.get(index).evaluate(tuple, ptr)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+ T t = visitor.visitLeave(this, l);
+ if (t == null) {
+ t = visitor.defaultReturn(this, l);
+ }
+ return t;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("CASE ");
+ for (int i = 0; i < children.size() - 1; i+=2) {
+ buf.append("WHEN ");
+ buf.append(children.get(i+1));
+ buf.append(" THEN ");
+ buf.append(children.get(i));
+ }
+ if (hasElse()) {
+ buf.append(" ELSE " + children.get(children.size()-1));
+ }
+ buf.append(" END");
+ return buf.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java b/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
new file mode 100644
index 0000000..54758f2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CeilingDecimalExpression extends BaseSingleExpression {
+ private static final MathContext CEILING_CONTEXT = new MathContext(1, RoundingMode.CEILING);
+
+ public CeilingDecimalExpression() {
+ }
+
+ public CeilingDecimalExpression(Expression child) {
+ super(child);
+ }
+
+ protected MathContext getMathContext() {
+ return CEILING_CONTEXT;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ Expression child = getChild();
+ if (child.evaluate(tuple, ptr)) {
+ PDataType childType = child.getDataType();
+ childType.coerceBytes(ptr, childType, child.getColumnModifier(), null);
+ BigDecimal value = (BigDecimal) childType.toObject(ptr);
+ value = value.round(getMathContext());
+ byte[] b = childType.toBytes(value, child.getColumnModifier());
+ ptr.set(b);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return getChild().getColumnModifier();
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return getChild().getDataType();
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ return getChild().accept(visitor);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("CEIL(");
+ for (int i = 0; i < children.size() - 1; i++) {
+ buf.append(getChild().toString());
+ }
+ buf.append(")");
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java b/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
new file mode 100644
index 0000000..05ce968
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CeilingTimestampExpression extends BaseSingleExpression {
+ private static final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+
+ public CeilingTimestampExpression() {
+ }
+
+ public CeilingTimestampExpression(Expression child) {
+ super(child);
+ }
+
+ protected int getRoundUpAmount() {
+ return 1;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ Expression child = children.get(0);
+ if (child.evaluate(tuple, ptr)) {
+ PDataType childType = child.getDataType();
+ tempPtr.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+ childType.coerceBytes(tempPtr, childType, child.getColumnModifier(), null);
+ Timestamp value = (Timestamp) childType.toObject(tempPtr);
+ if (value.getNanos() > 0) {
+ value = new Timestamp(value.getTime()+getRoundUpAmount());
+ byte[] b = childType.toBytes(value, child.getColumnModifier());
+ ptr.set(b);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return children.get(0).getDataType();
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ return getChild().accept(visitor);
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("CEIL(");
+ for (int i = 0; i < children.size() - 1; i++) {
+ buf.append(getChild().toString());
+ }
+ buf.append(")");
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CoerceExpression.java b/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
new file mode 100644
index 0000000..bbb54a0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CoerceExpression extends BaseSingleExpression {
+ private PDataType toType;
+ private ColumnModifier toMod;
+ private Integer byteSize;
+
+ public CoerceExpression() {
+ }
+
+ public static Expression create(Expression expression, PDataType toType) throws SQLException {
+ return toType == expression.getDataType() ? expression : expression instanceof LiteralExpression ? LiteralExpression.newConstant(((LiteralExpression)expression).getValue(), toType) : new CoerceExpression(expression, toType);
+ }
+
+ //Package protected for tests
+ CoerceExpression(Expression expression, PDataType toType) {
+ this(expression, toType, null, null);
+ }
+
+ CoerceExpression(Expression expression, PDataType toType, ColumnModifier toMod, Integer byteSize) {
+ super(expression);
+ this.toType = toType;
+ this.toMod = toMod;
+ this.byteSize = byteSize;
+ }
+
+ @Override
+ public Integer getByteSize() {
+ return byteSize;
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return byteSize;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((byteSize == null) ? 0 : byteSize.hashCode());
+ result = prime * result + ((toMod == null) ? 0 : toMod.hashCode());
+ result = prime * result + ((toType == null) ? 0 : toType.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ CoerceExpression other = (CoerceExpression)obj;
+ if (byteSize == null) {
+ if (other.byteSize != null) return false;
+ } else if (!byteSize.equals(other.byteSize)) return false;
+ if (toMod != other.toMod) return false;
+ if (toType != other.toType) return false;
+ return true;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ toType = PDataType.values()[WritableUtils.readVInt(input)];
+ toMod = ColumnModifier.fromSystemValue(WritableUtils.readVInt(input));
+ int byteSize = WritableUtils.readVInt(input);
+ this.byteSize = byteSize == -1 ? null : byteSize;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ WritableUtils.writeVInt(output, toType.ordinal());
+ WritableUtils.writeVInt(output, ColumnModifier.toSystemValue(toMod));
+ WritableUtils.writeVInt(output, byteSize == null ? -1 : byteSize);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (getChild().evaluate(tuple, ptr)) {
+ getDataType().coerceBytes(ptr, getChild().getDataType(), getChild().getColumnModifier(), getColumnModifier());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return toType;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return toMod;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+ T t = visitor.visitLeave(this, l);
+ if (t == null) {
+ t = visitor.defaultReturn(this, l);
+ }
+ return t;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("TO_" + toType.toString() + "(");
+ for (int i = 0; i < children.size() - 1; i++) {
+ buf.append(children.get(i) + ", ");
+ }
+ buf.append(children.get(children.size()-1) + ")");
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/ColumnExpression.java b/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
new file mode 100644
index 0000000..bfb0d70
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Objects;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+
+/**
+ *
+ * Common base class for column value accessors
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class ColumnExpression extends BaseTerminalExpression {
+ protected PDataType type;
+ private Integer byteSize;
+ private boolean isNullable;
+ private Integer maxLength;
+ private Integer scale;
+ private ColumnModifier columnModifier;
+
+ public ColumnExpression() {
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isNullable() ? 1231 : 1237);
+ Integer maxLength = this.getByteSize();
+ result = prime * result + ((maxLength == null) ? 0 : maxLength.hashCode());
+ PDataType type = this.getDataType();
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ColumnExpression other = (ColumnExpression)obj;
+ if (this.isNullable() != other.isNullable()) return false;
+ if (!Objects.equal(this.getByteSize(),other.getByteSize())) return false;
+ if (this.getDataType() != other.getDataType()) return false;
+ return true;
+ }
+
+ public ColumnExpression(PDatum datum) {
+ this.type = datum.getDataType();
+ this.isNullable = datum.isNullable();
+ if (type.isFixedWidth() && type.getByteSize() == null) {
+ this.byteSize = datum.getByteSize();
+ }
+ this.maxLength = datum.getMaxLength();
+ this.scale = datum.getScale();
+ this.columnModifier = datum.getColumnModifier();
+ }
+
+ @Override
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return type;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return columnModifier;
+ }
+
+ @Override
+ public Integer getByteSize() {
+ if (byteSize != null) {
+ return byteSize;
+ }
+ return super.getByteSize();
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public Integer getScale() {
+ return scale;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ // read/write type ordinal, maxLength presence, scale presence and isNullable bit together to save space
+ int typeAndFlag = WritableUtils.readVInt(input);
+ isNullable = (typeAndFlag & 0x01) != 0;
+ if ((typeAndFlag & 0x02) != 0) {
+ scale = WritableUtils.readVInt(input);
+ }
+ if ((typeAndFlag & 0x04) != 0) {
+ maxLength = WritableUtils.readVInt(input);
+ }
+ type = PDataType.values()[typeAndFlag >>> 3];
+ if (type.isFixedWidth() && type.getByteSize() == null) {
+ byteSize = WritableUtils.readVInt(input);
+ }
+ columnModifier = ColumnModifier.fromSystemValue(WritableUtils.readVInt(input));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // read/write type ordinal, maxLength presence, scale presence and isNullable bit together to save space
+ int typeAndFlag = (isNullable ? 1 : 0) | ((scale != null ? 1 : 0) << 1) | ((maxLength != null ? 1 : 0) << 2)
+ | (type.ordinal() << 3);
+ WritableUtils.writeVInt(output,typeAndFlag);
+ if (scale != null) {
+ WritableUtils.writeVInt(output, scale);
+ }
+ if (maxLength != null) {
+ WritableUtils.writeVInt(output, maxLength);
+ }
+ if (type.isFixedWidth() && type.getByteSize() == null) {
+ WritableUtils.writeVInt(output, byteSize);
+ }
+ WritableUtils.writeVInt(output, ColumnModifier.toSystemValue(columnModifier));
+ }
+}