You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:51 UTC
[35/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
new file mode 100644
index 0000000..9f01fff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+
+
+/**
+ *
+ * Coprocessor protocol for Phoenix DDL. Phoenix stores the table metadata in
+ * an HBase table named SYSTEM.TABLE. Each table is represented by:
+ * - one row for the table
+ * - one row per column in the tabe
+ * Upto {@link #DEFAULT_MAX_META_DATA_VERSIONS} versions are kept. The time
+ * stamp of the metadata must always be increasing. The timestamp of the key
+ * values in the data row corresponds to the schema that it's using.
+ *
+ * TODO: dynamically prune number of schema version kept based on whether or
+ * not the data table still uses it (based on the min time stamp of the data
+ * table).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface MetaDataProtocol extends CoprocessorProtocol {
+ public static final int PHOENIX_MAJOR_VERSION = 3;
+ public static final int PHOENIX_MINOR_VERSION = 0;
+ public static final int PHOENIX_PATCH_NUMBER = 0;
+ public static final int PHOENIX_VERSION =
+ MetaDataUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
+
+ public static final long MIN_TABLE_TIMESTAMP = 0;
+ // Increase MIN_SYSTEM_TABLE_TIMESTAMP by one for each schema change SYSTEM.TABLE schema changes.
+ // For 1.0,1.1,1.2,and 1.2.1 we used MetaDataProtocol.MIN_TABLE_TIMESTAMP+1
+ // For 2.0 and above, we use MetaDataProtocol.MIN_TABLE_TIMESTAMP+7 so that we can add the five new
+ // columns to the existing system table (three new columns in 1.2.1 and two new columns in 1.2)
+ // For 3.0 and above, we use MIN_TABLE_TIMESTAMP + 8 so that we can add the tenant_id column
+ // as the first column to the existing system table.
+ // For 3.1 (SNAPSHOT) and above, we use MIN_TABLE_TIMESTAMP + 9 so that we can add
+ // the multi_tenant and multi_type columns for multi tenancy
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 9;
+ public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
+
+ // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
+ // a different code for every type of error.
+ // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION
+ // ILLEGAL_MUTATION (+ sql code)
+ public enum MutationCode {
+ TABLE_ALREADY_EXISTS,
+ TABLE_NOT_FOUND,
+ COLUMN_NOT_FOUND,
+ COLUMN_ALREADY_EXISTS,
+ CONCURRENT_TABLE_MUTATION,
+ TABLE_NOT_IN_REGION,
+ NEWER_TABLE_FOUND,
+ UNALLOWED_TABLE_MUTATION,
+ NO_PK_COLUMNS,
+ PARENT_TABLE_NOT_FOUND
+ };
+
+ public static class MetaDataMutationResult implements Writable {
+ private MutationCode returnCode;
+ private long mutationTime;
+ private PTable table;
+ private List<byte[]> tableNamesToDelete;
+ private byte[] columnName;
+ private byte[] familyName;
+ private boolean wasUpdated;
+
+ public MetaDataMutationResult() {
+ }
+
+ public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, PColumn column) {
+ this(returnCode, currentTime, table);
+ if(column != null){
+ this.columnName = column.getName().getBytes();
+ this.familyName = column.getFamilyName().getBytes();
+ }
+
+ }
+
+ public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table) {
+ this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
+ }
+
+ public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete) {
+ this.returnCode = returnCode;
+ this.mutationTime = currentTime;
+ this.table = table;
+ this.tableNamesToDelete = tableNamesToDelete;
+ }
+
+ public MutationCode getMutationCode() {
+ return returnCode;
+ }
+
+ public long getMutationTime() {
+ return mutationTime;
+ }
+
+ public boolean wasUpdated() {
+ return wasUpdated;
+ }
+
+ public PTable getTable() {
+ return table;
+ }
+
+ public void setTable(PTable table) {
+ this.table = table;
+ }
+
+ public List<byte[]> getTableNamesToDelete() {
+ return tableNamesToDelete;
+ }
+
+ public byte[] getColumnName() {
+ return columnName;
+ }
+
+ public byte[] getFamilyName() {
+ return familyName;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.returnCode = MutationCode.values()[WritableUtils.readVInt(input)];
+ this.mutationTime = input.readLong();
+ wasUpdated = input.readBoolean();
+ if (wasUpdated) {
+ this.table = new PTableImpl();
+ this.table.readFields(input);
+ }
+ columnName = Bytes.readByteArray(input);
+ if (columnName.length > 0) {
+ familyName = Bytes.readByteArray(input);
+ }
+ boolean hasTablesToDelete = input.readBoolean();
+ if (hasTablesToDelete) {
+ int count = input.readInt();
+ tableNamesToDelete = Lists.newArrayListWithExpectedSize(count);
+ for( int i = 0 ; i < count ; i++ ){
+ byte[] tableName = Bytes.readByteArray(input);
+ tableNamesToDelete.add(tableName);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ WritableUtils.writeVInt(output, returnCode.ordinal());
+ output.writeLong(mutationTime);
+ output.writeBoolean(table != null);
+ if (table != null) {
+ table.write(output);
+ }
+ Bytes.writeByteArray(output, columnName == null ? ByteUtil.EMPTY_BYTE_ARRAY : columnName);
+ if (columnName != null) {
+ Bytes.writeByteArray(output, familyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : familyName);
+ }
+ if(tableNamesToDelete != null && tableNamesToDelete.size() > 0 ) {
+ output.writeBoolean(true);
+ output.writeInt(tableNamesToDelete.size());
+ for(byte[] tableName : tableNamesToDelete) {
+ Bytes.writeByteArray(output,tableName);
+ }
+
+ } else {
+ output.writeBoolean(false);
+ }
+
+ }
+ }
+
+ /**
+ * The the latest Phoenix table at or before the given clientTimestamp. If the
+ * client already has the latest (based on the tableTimestamp), then no table
+ * is returned.
+ * @param tenantId
+ * @param schemaName
+ * @param tableName
+ * @param tableTimestamp
+ * @param clientTimestamp
+ * @return MetaDataMutationResult
+ * @throws IOException
+ */
+ MetaDataMutationResult getTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimestamp) throws IOException;
+
+ /**
+ * Create a new Phoenix table
+ * @param tableMetadata
+ * @return MetaDataMutationResult
+ * @throws IOException
+ */
+ MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException;
+
+ /**
+ * Drop an existing Phoenix table
+ * @param tableMetadata
+ * @param tableType
+ * @return MetaDataMutationResult
+ * @throws IOException
+ */
+ MetaDataMutationResult dropTable(List<Mutation> tableMetadata, String tableType) throws IOException;
+
+ /**
+ * Add a column to an existing Phoenix table
+ * @param tableMetadata
+ * @return MetaDataMutationResult
+ * @throws IOException
+ */
+ MetaDataMutationResult addColumn(List<Mutation> tableMetadata) throws IOException;
+
+ /**
+ * Drop a column from an existing Phoenix table
+ * @param tableMetadata
+ * @return MetaDataMutationResult
+ * @throws IOException
+ */
+ MetaDataMutationResult dropColumn(List<Mutation> tableMetadata) throws IOException;
+
+ MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata) throws IOException;
+
+ /**
+ * Clears the server-side cache of table meta data. Used between test runs to
+ * ensure no side effects.
+ */
+ void clearCache();
+
+ /**
+ * Get the version of the server-side HBase and phoenix.jar. Used when initially connecting
+ * to a cluster to ensure that the client and server jars are compatible.
+ */
+ long getVersion();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
new file mode 100644
index 0000000..d0bd7c5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import org.apache.phoenix.cache.GlobalCache;
+
+
+/**
+ * Coprocessor for metadata related operations. This coprocessor would only be registered
+ * to SYSTEM.TABLE.
+ */
+public class MetaDataRegionObserver extends BaseRegionObserver {
+
+ @Override
+ public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean abortRequested) {
+ GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
new file mode 100644
index 0000000..ee9e6e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+
+public class ScanProjector {
+
+ public enum ProjectionType {TABLE, CF, CQ};
+
+ private static final String SCAN_PROJECTOR = "scanProjector";
+ private static final byte[] SEPERATOR = Bytes.toBytes(":");
+
+ private final ProjectionType type;
+ private final byte[] tablePrefix;
+ private final Map<ImmutableBytesPtr, byte[]> cfProjectionMap;
+ private final Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqProjectionMap;
+
+ private ScanProjector(ProjectionType type, byte[] tablePrefix,
+ Map<ImmutableBytesPtr, byte[]> cfProjectionMap, Map<ImmutableBytesPtr,
+ Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqProjectionMap) {
+ this.type = ProjectionType.TABLE;
+ this.tablePrefix = tablePrefix;
+ this.cfProjectionMap = cfProjectionMap;
+ this.cqProjectionMap = cqProjectionMap;
+ }
+
+ public static void serializeProjectorIntoScan(Scan scan, ScanProjector projector) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, projector.type.ordinal());
+ switch (projector.type) {
+ case TABLE:
+ WritableUtils.writeCompressedByteArray(output, projector.tablePrefix);
+ break;
+ case CF:
+ WritableUtils.writeVInt(output, projector.cfProjectionMap.size());
+ for (Map.Entry<ImmutableBytesPtr, byte[]> entry : projector.cfProjectionMap.entrySet()) {
+ WritableUtils.writeCompressedByteArray(output, entry.getKey().get());
+ WritableUtils.writeCompressedByteArray(output, entry.getValue());
+ }
+ break;
+ case CQ:
+ WritableUtils.writeVInt(output, projector.cqProjectionMap.size());
+ for (Map.Entry<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> entry :
+ projector.cqProjectionMap.entrySet()) {
+ WritableUtils.writeCompressedByteArray(output, entry.getKey().get());
+ Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map = entry.getValue();
+ WritableUtils.writeVInt(output, map.size());
+ for (Map.Entry<ImmutableBytesPtr, Pair<byte[], byte[]>> e : map.entrySet()) {
+ WritableUtils.writeCompressedByteArray(output, e.getKey().get());
+ WritableUtils.writeCompressedByteArray(output, e.getValue().getFirst());
+ WritableUtils.writeCompressedByteArray(output, e.getValue().getSecond());
+ }
+ }
+ break;
+ default:
+ throw new IOException("Unrecognized projection type '" + projector.type + "'");
+ }
+ scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static ScanProjector deserializeProjectorFromScan(Scan scan) {
+ byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+ if (proj == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int t = WritableUtils.readVInt(input);
+ ProjectionType type = ProjectionType.values()[t];
+ if (type == ProjectionType.TABLE) {
+ byte[] tablePrefix = WritableUtils.readCompressedByteArray(input);
+ return new ScanProjector(type, tablePrefix, null, null);
+ }
+ if (type == ProjectionType.CF) {
+ int count = WritableUtils.readVInt(input);
+ Map<ImmutableBytesPtr, byte[]> cfMap = new HashMap<ImmutableBytesPtr, byte[]>();
+ for (int i = 0; i < count; i++) {
+ byte[] cf = WritableUtils.readCompressedByteArray(input);
+ byte[] renamed = WritableUtils.readCompressedByteArray(input);
+ cfMap.put(new ImmutableBytesPtr(cf), renamed);
+ }
+ return new ScanProjector(type, null, cfMap, null);
+ }
+
+ int count = WritableUtils.readVInt(input);
+ Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqMap =
+ new HashMap<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>>();
+ for (int i = 0; i < count; i++) {
+ byte[] cf = WritableUtils.readCompressedByteArray(input);
+ int nQuals = WritableUtils.readVInt(input);
+ Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map =
+ new HashMap<ImmutableBytesPtr, Pair<byte[], byte[]>>();
+ for (int j = 0; j < nQuals; j++) {
+ byte[] cq = WritableUtils.readCompressedByteArray(input);
+ byte[] renamedCf = WritableUtils.readCompressedByteArray(input);
+ byte[] renamedCq = WritableUtils.readCompressedByteArray(input);
+ map.put(new ImmutableBytesPtr(cq), new Pair<byte[], byte[]>(renamedCf, renamedCq));
+ }
+ cqMap.put(new ImmutableBytesPtr(cf), map);
+ }
+ return new ScanProjector(type, null, null, cqMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public ProjectionType getType() {
+ return this.type;
+ }
+
+ public byte[] getTablePrefix() {
+ return this.tablePrefix;
+ }
+
+ public Map<ImmutableBytesPtr, byte[]> getCfProjectionMap() {
+ return this.cfProjectionMap;
+ }
+
+ public Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> getCqProjectionMap() {
+ return this.cqProjectionMap;
+ }
+
+ public KeyValue getProjectedKeyValue(KeyValue kv) {
+ if (type == ProjectionType.TABLE) {
+ byte[] cf = ByteUtil.concat(tablePrefix, SEPERATOR, kv.getFamily());
+ return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ cf, kv.getQualifier(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+
+ if (type == ProjectionType.CF) {
+ byte[] cf = cfProjectionMap.get(new ImmutableBytesPtr(kv.getFamily()));
+ if (cf == null)
+ return kv;
+ return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ cf, kv.getQualifier(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+
+ Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map = cqProjectionMap.get(new ImmutableBytesPtr(kv.getFamily()));
+ if (map == null)
+ return kv;
+
+ Pair<byte[], byte[]> col = map.get(new ImmutableBytesPtr(kv.getQualifier()));
+ if (col == null)
+ return kv;
+
+ return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ col.getFirst(), col.getSecond(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
new file mode 100644
index 0000000..cb76b70
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.RegionScannerResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * Wraps the scan performing a non aggregate query to prevent needless retries
+ * if a Phoenix bug is encountered from our custom filter expression evaluation.
+ * Unfortunately, until HBASE-7481 gets fixed, there's no way to do this from our
+ * custom filters.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanRegionObserver extends BaseScannerRegionObserver {
+ public static final String NON_AGGREGATE_QUERY = "NonAggregateQuery";
+ private static final String TOPN = "TopN";
+
+ public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, thresholdBytes);
+ WritableUtils.writeVInt(output, limit);
+ WritableUtils.writeVInt(output, estimatedRowSize);
+ WritableUtils.writeVInt(output, orderByExpressions.size());
+ for (OrderByExpression orderingCol : orderByExpressions) {
+ orderingCol.write(output);
+ }
+ scan.setAttribute(TOPN, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+ byte[] topN = scan.getAttribute(TOPN);
+ if (topN == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int thresholdBytes = WritableUtils.readVInt(input);
+ int limit = WritableUtils.readVInt(input);
+ int estimatedRowSize = WritableUtils.readVInt(input);
+ int size = WritableUtils.readVInt(input);
+ List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ OrderByExpression orderByExpression = new OrderByExpression();
+ orderByExpression.readFields(input);
+ orderByExpressions.add(orderByExpression);
+ }
+ ResultIterator inner = new RegionScannerResultIterator(s);
+ return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
+ byte[] isScanQuery = scan.getAttribute(NON_AGGREGATE_QUERY);
+
+ if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
+ return s;
+ }
+
+ final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
+
+ RegionScanner innerScanner = s;
+ if (p != null || j != null) {
+ innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
+ }
+
+ final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+ if (iterator == null) {
+ return getWrappedScanner(c, innerScanner);
+ }
+
+ return getTopNScanner(c, innerScanner, iterator, tenantId);
+ }
+
+ /**
+ * Return region scanner that does TopN.
+ * We only need to call startRegionOperation and closeRegionOperation when
+ * getting the first Tuple (which forces running through the entire region)
+ * since after this everything is held in memory
+ */
+ private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesWritable tenantId) throws Throwable {
+ final Tuple firstTuple;
+ TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId);
+ long estSize = iterator.getEstimatedByteSize();
+ final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+ final HRegion region = c.getEnvironment().getRegion();
+ region.startRegionOperation();
+ try {
+ // Once we return from the first call to next, we've run through and cached
+ // the topN rows, so we no longer need to start/stop a region operation.
+ firstTuple = iterator.next();
+ // Now that the topN are cached, we can resize based on the real size
+ long actualSize = iterator.getByteSize();
+ chunk.resize(actualSize);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ return null;
+ } finally {
+ region.closeRegionOperation();
+ }
+ return new BaseRegionScanner() {
+ private Tuple tuple = firstTuple;
+
+ @Override
+ public boolean isFilterDone() {
+ return tuple == null;
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ try {
+ if (isFilterDone()) {
+ return false;
+ }
+
+ for (int i = 0; i < tuple.size(); i++) {
+ results.add(tuple.getValue(i));
+ }
+
+ tuple = iterator.next();
+ return !isFilterDone();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ s.close();
+ } finally {
+ chunk.close(); }
+ }
+ };
+ }
+
+ /**
+ * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+ * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+ * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+ * the same from a custom filter.
+ */
+ private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s) {
+ return new RegionScanner() {
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ try {
+ return s.next(results);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results, String metric) throws IOException {
+ try {
+ return s.next(results, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> result, int limit) throws IOException {
+ try {
+ return s.next(result, limit);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
+ try {
+ return s.next(result, limit, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ s.close();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return s.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return s.reseek(row);
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return s.getMvccReadPoint();
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result, String metric) throws IOException {
+ try {
+ return s.nextRaw(result, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException {
+ try {
+ return s.nextRaw(result, limit, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
new file mode 100644
index 0000000..ed04cc3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -0,0 +1,258 @@
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ *
+ * Region observer coprocessor for sequence operations:
+ * 1) For creating a sequence, as checkAndPut does not allow us to scope the
+ * Get done for the check with a TimeRange.
+ * 2) For incrementing a sequence, as increment does not a) allow us to set the
+ * timestamp of the key value being incremented and b) recognize when the key
+ * value being incremented does not exist
+ * 3) For deleting a sequence, as checkAndDelete does not allow us to scope
+ * the Get done for the check with a TimeRange.
+ *
+ * @author jtaylor
+ * @since 3.0.0
+ */
+public class SequenceRegionObserver extends BaseRegionObserver {
+ public enum Op {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
+ public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
+ public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
+ public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
+ private static final byte[] SUCCESS_VALUE = PDataType.INTEGER.toBytes(Integer.valueOf(Sequence.SUCCESS));
+
+ private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
+ byte[] errorCodeBuf = new byte[PDataType.INTEGER.getByteSize()];
+ PDataType.INTEGER.getCodec().encodeInt(errorCode, errorCodeBuf, 0);
+ return new Result(Collections.singletonList(
+ KeyValueUtil.newKeyValue(row,
+ PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
+ }
+ /**
+ *
+ * Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment
+ * implementation (HBASE-10254):
+ * 1) Lack of recognition and identification of when the key value to increment doesn't exist
+ * 2) Lack of the ability to set the timestamp of the updated key value.
+ * Works the same as existing region.increment(), except assumes there is a single column to
+ * increment and uses Phoenix LONG encoding.
+ * @author jtaylor
+ * @since 3.0.0
+ */
+ @Override
+ public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment increment) throws IOException {
+ RegionCoprocessorEnvironment env = e.getEnvironment();
+ // We need to set this to prevent region.increment from being called
+ e.bypass();
+ e.complete();
+ HRegion region = env.getRegion();
+ byte[] row = increment.getRow();
+ TimeRange tr = increment.getTimeRange();
+ region.startRegionOperation();
+ try {
+ Integer lid = region.getLock(null, row, true);
+ try {
+ long maxTimestamp = tr.getMax();
+ if (maxTimestamp == HConstants.LATEST_TIMESTAMP) {
+ maxTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ tr = new TimeRange(tr.getMin(), maxTimestamp);
+ }
+ Get get = new Get(row);
+ get.setTimeRange(tr.getMin(), tr.getMax());
+ for (Map.Entry<byte[],NavigableMap<byte[], Long>> entry : increment.getFamilyMap().entrySet()) {
+ byte[] cf = entry.getKey();
+ for (byte[] cq : entry.getValue().keySet()) {
+ get.addColumn(cf, cq);
+ }
+ }
+ Result result = region.get(get);
+ if (result.isEmpty()) {
+ return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
+ }
+ KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
+ KeyValue incrementByKV = Sequence.getIncrementByKV(result);
+ KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
+ long value = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), null);
+ long incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), incrementByKV.getValueOffset(), null);
+ int cacheSize = PDataType.INTEGER.getCodec().decodeInt(cacheSizeKV.getBuffer(), cacheSizeKV.getValueOffset(), null);
+ value += incrementBy * cacheSize;
+ byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
+ PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0);
+ Put put = new Put(row, currentValueKV.getTimestamp());
+ // Hold timestamp constant for sequences, so that clients always only see the latest value
+ // regardless of when they connect.
+ KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, currentValueKV.getFamily(), currentValueKV.getQualifier(), currentValueKV.getTimestamp(), valueBuffer);
+ put.add(newCurrentValueKV);
+ @SuppressWarnings("unchecked")
+ Pair<Mutation,Integer>[] mutations = new Pair[1];
+ mutations[0] = new Pair<Mutation,Integer>(put, lid);
+ region.batchMutate(mutations);
+ return Sequence.replaceCurrentValueKV(result, newCurrentValueKV);
+ } finally {
+ region.releaseRowLock(lid);
+ }
+ } catch (Throwable t) {
+ ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
+ return null; // Impossible
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+
+ /**
+ * Override the preAppend for checkAndPut and checkAndDelete, as we need the ability to
+ * a) set the TimeRange for the Get being done and
+ * b) return something back to the client to indicate success/failure
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public Result preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Append append) throws IOException {
+ byte[] opBuf = append.getAttribute(OPERATION_ATTRIB);
+ if (opBuf == null) {
+ return null;
+ }
+ Op op = Op.values()[opBuf[0]];
+ KeyValue keyValue = append.getFamilyMap().values().iterator().next().iterator().next();
+
+ long clientTimestamp = HConstants.LATEST_TIMESTAMP;
+ long minGetTimestamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+ long maxGetTimestamp = HConstants.LATEST_TIMESTAMP;
+ boolean hadClientTimestamp;
+ byte[] clientTimestampBuf = null;
+ if (op == Op.RETURN_SEQUENCE) {
+ // When returning sequences, this allows us to send the expected timestamp
+ // of the sequence to make sure we don't reset any other sequence
+ hadClientTimestamp = true;
+ clientTimestamp = minGetTimestamp = keyValue.getTimestamp();
+ maxGetTimestamp = minGetTimestamp + 1;
+ } else {
+ clientTimestampBuf = append.getAttribute(MAX_TIMERANGE_ATTRIB);
+ if (clientTimestampBuf != null) {
+ clientTimestamp = maxGetTimestamp = Bytes.toLong(clientTimestampBuf);
+ }
+ hadClientTimestamp = (clientTimestamp != HConstants.LATEST_TIMESTAMP);
+ if (hadClientTimestamp) {
+ // Prevent race condition of creating two sequences at the same timestamp
+ // by looking for a sequence at or after the timestamp at which it'll be
+ // created.
+ if (op == Op.CREATE_SEQUENCE) {
+ maxGetTimestamp = clientTimestamp + 1;
+ }
+ } else {
+ clientTimestamp = maxGetTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ clientTimestampBuf = Bytes.toBytes(clientTimestamp);
+ }
+ }
+
+ RegionCoprocessorEnvironment env = e.getEnvironment();
+ // We need to set this to prevent region.append from being called
+ e.bypass();
+ e.complete();
+ HRegion region = env.getRegion();
+ byte[] row = append.getRow();
+ region.startRegionOperation();
+ try {
+ Integer lid = region.getLock(null, row, true);
+ try {
+ byte[] family = keyValue.getFamily();
+ byte[] qualifier = keyValue.getQualifier();
+
+ Get get = new Get(row);
+ get.setTimeRange(minGetTimestamp, maxGetTimestamp);
+ get.addColumn(family, qualifier);
+ Result result = region.get(get);
+ if (result.isEmpty()) {
+ if (op == Op.DROP_SEQUENCE || op == Op.RETURN_SEQUENCE) {
+ return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
+ }
+ } else {
+ if (op == Op.CREATE_SEQUENCE) {
+ return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_ALREADY_EXIST.getErrorCode());
+ }
+ }
+ Mutation m = null;
+ switch (op) {
+ case RETURN_SEQUENCE:
+ KeyValue currentValueKV = result.raw()[0];
+ long expectedValue = PDataType.LONG.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, null);
+ long value = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), null);
+ // Timestamp should match exactly, or we may have the wrong sequence
+ if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
+ return new Result(Collections.singletonList(KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
+ }
+ m = new Put(row, currentValueKV.getTimestamp());
+ m.getFamilyMap().putAll(append.getFamilyMap());
+ break;
+ case DROP_SEQUENCE:
+ m = new Delete(row, clientTimestamp, null);
+ break;
+ case CREATE_SEQUENCE:
+ m = new Put(row, clientTimestamp);
+ m.getFamilyMap().putAll(append.getFamilyMap());
+ break;
+ }
+ if (!hadClientTimestamp) {
+ for (List<KeyValue> kvs : m.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ kv.updateLatestStamp(clientTimestampBuf);
+ }
+ }
+ }
+ @SuppressWarnings("unchecked")
+ Pair<Mutation,Integer>[] mutations = new Pair[1];
+ mutations[0] = new Pair<Mutation,Integer>(m, lid);
+ region.batchMutate(mutations);
+ long serverTimestamp = MetaDataUtil.getClientTimeStamp(m);
+ // Return result with single KeyValue. The only piece of information
+ // the client cares about is the timestamp, which is the timestamp of
+ // when the mutation was actually performed (useful in the case of .
+ return new Result(Collections.singletonList(KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
+ } finally {
+ region.releaseRowLock(lid);
+ }
+ } catch (Throwable t) {
+ ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
+ return null; // Impossible
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
new file mode 100644
index 0000000..b27c14c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+
+
+
+
+/**
+ *
+ * Server-side implementation of {@link ServerCachingProtocol}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor implements ServerCachingProtocol {
+
+ @Override
+ public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+ TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+ tenantCache.addServerCache(new ImmutableBytesPtr(cacheId), cachePtr, cacheFactory);
+ return true;
+ }
+
+ @Override
+ public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException {
+ TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+ tenantCache.removeServerCache(new ImmutableBytesPtr(cacheId));
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
new file mode 100644
index 0000000..abda834
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+
+/**
+ *
+ * EndPoint coprocessor to send a cache to a region server.
+ * Used for:
+ * a) hash joins, to send the smaller side of the join to each region server
+ * b) secondary indexes, to send the necessary meta data to each region server
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ServerCachingProtocol extends CoprocessorProtocol {
+ public static interface ServerCacheFactory extends Writable {
+ public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+ }
+ /**
+ * Add the cache to the region server cache.
+ * @param tenantId the tenantId or null if not applicable
+ * @param cacheId unique identifier of the cache
+ * @param cachePtr pointer to the byte array of the cache
+ * @param cacheFactory factory that converts from byte array to object representation on the server side
+ * @return true on success and otherwise throws
+ * @throws SQLException
+ */
+ public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ /**
+ * Remove the cache from the region server cache. Called upon completion of
+ * the operation when cache is no longer needed.
+ * @param tenantId the tenantId or null if not applicable
+ * @param cacheId unique identifier of the cache
+ * @return true on success and otherwise throws
+ * @throws SQLException
+ */
+ public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
new file mode 100644
index 0000000..c7c4575
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -0,0 +1,423 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.client.KeyValueBuilder;
+import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+ private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
+ // TODO: move all constants into a single class
+ public static final String UNGROUPED_AGG = "UngroupedAgg";
+ public static final String DELETE_AGG = "DeleteAgg";
+ public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
+ public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
+ public static final String DELETE_CQ = "DeleteCQ";
+ public static final String DELETE_CF = "DeleteCF";
+ public static final String EMPTY_CF = "EmptyCF";
+ private KeyValueBuilder kvBuilder;
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ super.start(e);
+ this.kvBuilder = KeyValueBuilder.get(e.getHBaseVersion());
+ }
+
+ private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
+ if (indexUUID != null) {
+ for (Pair<Mutation,Integer> pair : mutations) {
+ pair.getFirst().setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ }
+ }
+ @SuppressWarnings("unchecked")
+ Pair<Mutation,Integer>[] mutationArray = new Pair[mutations.size()];
+ // TODO: should we use the one that is all or none?
+ region.batchMutate(mutations.toArray(mutationArray));
+ }
+
+ public static void serializeIntoScan(Scan scan) {
+ scan.setAttribute(UNGROUPED_AGG, QueryConstants.TRUE);
+ }
+
+ @Override
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
+ byte[] isUngroupedAgg = scan.getAttribute(UNGROUPED_AGG);
+ if (isUngroupedAgg == null) {
+ return s;
+ }
+
+ final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ RegionScanner theScanner = s;
+ if (p != null || j != null) {
+ theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
+ }
+ final RegionScanner innerScanner = theScanner;
+
+ byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+ PTable projectedTable = null;
+ List<Expression> selectExpressions = null;
+ byte[] upsertSelectTable = scan.getAttribute(UPSERT_SELECT_TABLE);
+ boolean isUpsert = false;
+ boolean isDelete = false;
+ byte[] deleteCQ = null;
+ byte[] deleteCF = null;
+ byte[][] values = null;
+ byte[] emptyCF = null;
+ ImmutableBytesWritable ptr = null;
+ if (upsertSelectTable != null) {
+ isUpsert = true;
+ projectedTable = deserializeTable(upsertSelectTable);
+ selectExpressions = deserializeExpressions(scan.getAttribute(UPSERT_SELECT_EXPRS));
+ values = new byte[projectedTable.getPKColumns().size()][];
+ ptr = new ImmutableBytesWritable();
+ } else {
+ byte[] isDeleteAgg = scan.getAttribute(DELETE_AGG);
+ isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+ if (!isDelete) {
+ deleteCF = scan.getAttribute(DELETE_CF);
+ deleteCQ = scan.getAttribute(DELETE_CQ);
+ }
+ emptyCF = scan.getAttribute(EMPTY_CF);
+ }
+
+ int batchSize = 0;
+ long ts = scan.getTimeRange().getMax();
+ HRegion region = c.getEnvironment().getRegion();
+ List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
+ if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
+ // TODO: size better
+ mutations = Lists.newArrayListWithExpectedSize(1024);
+ batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ }
+ Aggregators aggregators = ServerAggregators.deserialize(
+ scan.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration());
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ boolean hasMore;
+ boolean hasAny = false;
+ MultiKeyValueTuple result = new MultiKeyValueTuple();
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting ungrouped coprocessor scan " + scan);
+ }
+ long rowCount = 0;
+ MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ do {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ // Results are potentially returned even when the return value of s.next is false
+ // since this is an indication of whether or not there are more values after the
+ // ones returned
+ hasMore = innerScanner.nextRaw(results, null);
+ if (!results.isEmpty()) {
+ rowCount++;
+ result.setKeyValues(results);
+ try {
+ if (isDelete) {
+ @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+ // FIXME: the version of the Delete constructor without the lock args was introduced
+ // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
+ // of the client.
+ Delete delete = new Delete(results.get(0).getRow(),ts,null);
+ mutations.add(new Pair<Mutation,Integer>(delete,null));
+ } else if (isUpsert) {
+ Arrays.fill(values, null);
+ int i = 0;
+ List<PColumn> projectedColumns = projectedTable.getColumns();
+ for (; i < projectedTable.getPKColumns().size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ values[i] = ptr.copyBytes();
+ // If ColumnModifier from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getColumnModifier() != projectedColumns.get(i).getColumnModifier()) {
+ ColumnModifier.SORT_DESC.apply(values[i], 0, values[i], 0, values[i].length);
+ }
+ }
+ }
+ projectedTable.newKey(ptr, values);
+ PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+ for (; i < projectedColumns.size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ PColumn column = projectedColumns.get(i);
+ byte[] bytes = ptr.copyBytes();
+ Object value = expression.getDataType().toObject(bytes, column.getColumnModifier());
+ // If ColumnModifier from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getColumnModifier() != column.getColumnModifier()) {
+ ColumnModifier.SORT_DESC.apply(bytes, 0, bytes, 0, bytes.length);
+ }
+ // We are guaranteed that the two column will have the same type.
+ if (!column.getDataType().isSizeCompatible(column.getDataType(),
+ value, bytes,
+ expression.getMaxLength(), column.getMaxLength(),
+ expression.getScale(), column.getScale())) {
+ throw new ValueTypeIncompatibleException(column.getDataType(),
+ column.getMaxLength(), column.getScale());
+ }
+ bytes = column.getDataType().coerceBytes(bytes, value, expression.getDataType(),
+ expression.getMaxLength(), expression.getScale(), column.getMaxLength(), column.getScale());
+ row.setValue(column, bytes);
+ }
+ }
+ for (Mutation mutation : row.toRowMutations()) {
+ mutations.add(new Pair<Mutation,Integer>(mutation,null));
+ }
+ } else if (deleteCF != null && deleteCQ != null) {
+ // No need to search for delete column, since we project only it
+ // if no empty key value is being set
+ if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
+ Delete delete = new Delete(results.get(0).getRow());
+ delete.deleteColumns(deleteCF, deleteCQ, ts);
+ mutations.add(new Pair<Mutation,Integer>(delete,null));
+ }
+ }
+ if (emptyCF != null) {
+ /*
+ * If we've specified an emptyCF, then we need to insert an empty
+ * key value "retroactively" for any key value that is visible at
+ * the timestamp that the DDL was issued. Key values that are not
+ * visible at this timestamp will not ever be projected up to
+ * scans past this timestamp, so don't need to be considered.
+ * We insert one empty key value per row per timestamp.
+ */
+ Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
+ for (KeyValue kv : results) {
+ long kvts = kv.getTimestamp();
+ if (!timeStamps.contains(kvts)) {
+ Put put = new Put(kv.getRow());
+ put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(new Pair<Mutation,Integer>(put,null));
+ }
+ }
+ }
+ // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+ if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
+ commitBatch(region,mutations, indexUUID);
+ mutations.clear();
+ }
+ } catch (ConstraintViolationException e) {
+ // Log and ignore in count
+ logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
+ continue;
+ }
+ aggregators.aggregate(rowAggregators, result);
+ hasAny = true;
+ }
+ } while (hasMore);
+ } finally {
+ innerScanner.close();
+ region.closeRegionOperation();
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan);
+ }
+
+ if (!mutations.isEmpty()) {
+ commitBatch(region,mutations, indexUUID);
+ }
+
+ final boolean hadAny = hasAny;
+ KeyValue keyValue = null;
+ if (hadAny) {
+ byte[] value = aggregators.toBytes(rowAggregators);
+ keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+ }
+ final KeyValue aggKeyValue = keyValue;
+
+ RegionScanner scanner = new BaseRegionScanner() {
+ private boolean done = !hadAny;
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return innerScanner.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return done;
+ }
+
+ @Override
+ public void close() throws IOException {
+ innerScanner.close();
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ if (done) return false;
+ done = true;
+ results.add(aggKeyValue);
+ return false;
+ }
+ };
+ return scanner;
+ }
+
+ private static PTable deserializeTable(byte[] b) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ PTable table = new PTableImpl();
+ table.readFields(input);
+ return table;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static List<Expression> deserializeExpressions(byte[] b) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int size = WritableUtils.readVInt(input);
+ List<Expression> selectExpressions = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ ExpressionType type = ExpressionType.values()[WritableUtils.readVInt(input)];
+ Expression selectExpression = type.newInstance();
+ selectExpression.readFields(input);
+ selectExpressions.add(selectExpression);
+ }
+ return selectExpressions;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static byte[] serialize(PTable projectedTable) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ projectedTable.write(output);
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static byte[] serialize(List<Expression> selectExpressions) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, selectExpressions.size());
+ for (int i = 0; i < selectExpressions.size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
new file mode 100644
index 0000000..d1b0b18
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+
+public class PhoenixIOException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode code = SQLExceptionCode.IO_EXCEPTION;
+
+ public PhoenixIOException(Throwable e) {
+ super(e.getMessage(), code.getSQLState(), code.getErrorCode(), e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
new file mode 100644
index 0000000..fc96923
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLSyntaxErrorException;
+
+import org.antlr.runtime.MismatchedTokenException;
+import org.antlr.runtime.MissingTokenException;
+import org.antlr.runtime.RecognitionException;
+import org.antlr.runtime.Token;
+import org.antlr.runtime.UnwantedTokenException;
+
+
+public class PhoenixParserException extends SQLSyntaxErrorException {
+ private static final long serialVersionUID = 1L;
+
+ public static final PhoenixParserException newException(Throwable cause, String[] tokens) {
+ return new PhoenixParserException(getErrorMessage(cause, tokens), cause);
+ }
+
+ public PhoenixParserException(String msg, Throwable throwable) {
+ super(new SQLExceptionInfo.Builder(getErrorCode(throwable)).setRootCause(throwable)
+ .setMessage(msg).build().toString(),
+ getErrorCode(throwable).getSQLState(), getErrorCode(throwable).getErrorCode(), throwable);
+ }
+
+ public static String getLine(RecognitionException e) {
+ return Integer.toString(e.token.getLine());
+ }
+
+ public static String getColumn(RecognitionException e) {
+ return Integer.toString(e.token.getCharPositionInLine() + 1);
+ }
+
+ public static String getTokenLocation(RecognitionException e) {
+ return "line " + getLine(e) + ", column " + getColumn(e) + ".";
+ }
+
+ public static String getErrorMessage(Throwable e, String[] tokenNames) {
+ String msg;
+ if (e instanceof MissingTokenException) {
+ MissingTokenException mte = (MissingTokenException)e;
+ String tokenName;
+ if (mte.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[mte.expecting];
+ }
+ msg = "Missing \""+ tokenName +"\" at "+ getTokenLocation(mte);
+ } else if (e instanceof UnwantedTokenException) {
+ UnwantedTokenException ute = (UnwantedTokenException)e;
+ String tokenName;
+ if (ute.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[ute.expecting];
+ }
+ msg = "Unexpected input. Expecting \"" + tokenName + "\", got \"" + ute.getUnexpectedToken().getText()
+ + "\" at " + getTokenLocation(ute);
+ } else if (e instanceof MismatchedTokenException) {
+ MismatchedTokenException mte = (MismatchedTokenException)e;
+ String tokenName;
+ if (mte.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[mte.expecting];
+ }
+ msg = "Mismatched input. Expecting \"" + tokenName + "\", got \"" + mte.token.getText()
+ + "\" at " + getTokenLocation(mte);
+ } else if (e instanceof RecognitionException){
+ RecognitionException re = (RecognitionException) e;
+ msg = "Encountered \"" + re.token.getText() + "\" at " + getTokenLocation(re);
+ } else if (e instanceof UnknownFunctionException) {
+ UnknownFunctionException ufe = (UnknownFunctionException) e;
+ msg = "Unknown function: \"" + ufe.getFuncName() + "\".";
+ } else {
+ msg = e.getMessage();
+ }
+ return msg;
+ }
+
+ public static SQLExceptionCode getErrorCode(Throwable e) {
+ if (e instanceof MissingTokenException) {
+ return SQLExceptionCode.MISSING_TOKEN;
+ } else if (e instanceof UnwantedTokenException) {
+ return SQLExceptionCode.UNWANTED_TOKEN;
+ } else if (e instanceof MismatchedTokenException) {
+ return SQLExceptionCode.MISMATCHED_TOKEN;
+ } else if (e instanceof UnknownFunctionException) {
+ return SQLExceptionCode.UNKNOWN_FUNCTION;
+ } else {
+ return SQLExceptionCode.PARSER_ERROR;
+ }
+ }
+}