You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/09 02:23:42 UTC
[2/3] Rename stat package to stats
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
new file mode 100644
index 0000000..8e82a88
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TimeKeeper;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
+ */
+public class StatisticsWriter implements Closeable {
+ /**
+ * @param tableName TODO
+ * @param clientTimeStamp TODO
+ * @param Configuration
+ * Configruation to update the stats table.
+ * @param primaryTableName
+ * name of the primary table on which we should collect stats
+ * @return the {@link StatisticsWriter} for the given primary table.
+ * @throws IOException
+ * if the table cannot be created due to an underlying HTable creation error
+ */
+ public static StatisticsWriter getStatisticsTable(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException {
+ if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+ clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ }
+ StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
+ statsTable.commitLastStatsUpdatedTime();
+ return statsTable;
+ }
+
+ private final HTableInterface statisticsTable;
+ private final byte[] tableName;
+ private final long clientTimeStamp;
+
+ private StatisticsWriter(HTableInterface statsTable, String tableName, long clientTimeStamp) {
+ this.statisticsTable = statsTable;
+ this.tableName = PDataType.VARCHAR.toBytes(tableName);
+ this.clientTimeStamp = clientTimeStamp;
+ }
+
+ /**
+ * Close the connection to the table
+ */
+ @Override
+ public void close() throws IOException {
+ statisticsTable.close();
+ }
+
+ /**
+ * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued
+ * then we use Upsert queries to update the table
+ * If the region gets splitted or the major compaction happens we update using HTable.put()
+ * @param tracker - the statistics tracker
+ * @param fam - the family for which the stats is getting collected.
+ * @param mutations - list of mutations that collects all the mutations to commit in a batch
+ * @param tablekey - The table name
+ * @param schemaName - the schema name associated with the table
+ * @param region name - the region of the table for which the stats are collected
+ * @param split - if the updation is caused due to a split
+ * @throws IOException
+ * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
+ * update
+ */
+ public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
+ if (tracker == null) { return; }
+
+ byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
+ PDataType.VARCHAR.toBytes(regionName));
+ Put put = new Put(prefix);
+ if (tracker.getGuidePosts(fam) != null) {
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
+ clientTimeStamp, (tracker.getGuidePosts(fam)));
+ }
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
+ clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
+ clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+ // Add our empty column value so queries behave correctly
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
+ clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(put);
+ }
+
+ private static MutationType getMutationType(Mutation m) throws IOException {
+ if (m instanceof Put) {
+ return MutationType.PUT;
+ } else if (m instanceof Delete) {
+ return MutationType.DELETE;
+ } else {
+ throw new DoNotRetryIOException("Unsupported mutation type in stats commit"
+ + m.getClass().getName());
+ }
+ }
+ public void commitStats(List<Mutation> mutations) throws IOException {
+ if (mutations.size() > 0) {
+ byte[] row = mutations.get(0).getRow();
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ for (Mutation m : mutations) {
+ mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
+ }
+ MutateRowsRequest mrm = mrmBuilder.build();
+ CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ try {
+ service.mutateRows(null, mrm);
+ } catch (ServiceException ex) {
+ ProtobufUtil.toIOException(ex);
+ }
+ }
+ }
+
+ private void commitLastStatsUpdatedTime() throws IOException {
+ // Always use wallclock time for this, as it's a mechanism to prevent
+ // stats from being collected too often.
+ long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ byte[] prefix = tableName;
+ Put put = new Put(prefix);
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp,
+ PDataType.DATE.toBytes(new Date(currentTime)));
+ statisticsTable.put(put);
+ }
+
+ public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
+ throws IOException {
+ byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
+ PDataType.VARCHAR.toBytes(regionName));
+ mutations.add(new Delete(prefix, clientTimeStamp - 1));
+ }
+}
\ No newline at end of file