You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:04 UTC
[2/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 5270277..bb4054b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -255,12 +256,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
}
List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
- boolean useInstrumentedPool = conn
- .unwrap(PhoenixConnection.class)
- .getQueryServices()
- .getProps()
- .getBoolean(QueryServices.METRICS_ENABLED,
- QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
+ boolean useInstrumentedPool = GlobalClientMetrics.isMetricsEnabled()
+ || conn.unwrap(PhoenixConnection.class).isRequestLevelMetricsEnabled();
+
ExecutorService executor =
JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
try{
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index eb6dc3d..b500a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -32,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -40,6 +43,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@@ -100,8 +104,12 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
final List<Scan> scans = pSplit.getScans();
try {
List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ StatementContext ctx = queryPlan.getContext();
+ ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+ String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
+ queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName));
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
@@ -112,7 +120,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
this.resultIterator = iterator;
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(),queryPlan.getContext().getStatement());
+ this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 02c1dea..79b49c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,9 +17,6 @@
*/
package org.apache.phoenix.memory;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_MANAGER_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_WAIT_TIME;
-
import org.apache.http.annotation.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +89,6 @@ public class GlobalMemoryManager implements MemoryManager {
}
usedMemoryBytes += nBytes;
}
- MEMORY_WAIT_TIME.update(System.currentTimeMillis() - startTimeMs);
- MEMORY_MANAGER_BYTES.update(nBytes);
return nBytes;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
new file mode 100644
index 0000000..796e8ba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Version of {@link Metric} that can be used when the metric is being concurrently accessed or modified by multiple
+ * threads.
+ */
+public class AtomicMetric implements Metric {
+
+ private final MetricType type;
+ private final AtomicLong value = new AtomicLong();
+
+ public AtomicMetric(MetricType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getName() {
+ return type.name();
+ }
+
+ @Override
+ public String getDescription() {
+ return type.description();
+ }
+
+ @Override
+ public long getValue() {
+ return value.get();
+ }
+
+ @Override
+ public void change(long delta) {
+ value.addAndGet(delta);
+ }
+
+ @Override
+ public void increment() {
+ value.incrementAndGet();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return getName() + ": " + value.get();
+ }
+
+ @Override
+ public void reset() {
+ value.set(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
new file mode 100644
index 0000000..7ebb0c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+
+/**
+ * Interface for representing a metric that could be published and possibly combined with a metric of the same
+ * type.
+ */
+public interface CombinableMetric extends Metric {
+
+ String getPublishString();
+
+ CombinableMetric combine(CombinableMetric metric);
+
+ public class NoOpRequestMetric implements CombinableMetric {
+
+ public static NoOpRequestMetric INSTANCE = new NoOpRequestMetric();
+ private static final String EMPTY_STRING = "";
+
+ @Override
+ public String getName() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public String getDescription() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public long getValue() {
+ return 0;
+ }
+
+ @Override
+ public void change(long delta) {}
+
+ @Override
+ public void increment() {}
+
+ @Override
+ public String getCurrentMetricState() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public String getPublishString() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public CombinableMetric combine(CombinableMetric metric) {
+ return INSTANCE;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
new file mode 100644
index 0000000..fa6f7d3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CombinableMetricImpl implements CombinableMetric {
+
+ private final Metric metric;
+
+ public CombinableMetricImpl(MetricType type) {
+ metric = new NonAtomicMetric(type);
+ }
+
+ @Override
+ public String getName() {
+ return metric.getName();
+ }
+
+ @Override
+ public String getDescription() {
+ return metric.getDescription();
+ }
+
+ @Override
+ public long getValue() {
+ return metric.getValue();
+ }
+
+ @Override
+ public void change(long delta) {
+ metric.change(delta);
+ }
+
+ @Override
+ public void increment() {
+ metric.increment();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return metric.getCurrentMetricState();
+ }
+
+ @Override
+ public void reset() {
+ metric.reset();
+ }
+
+ @Override
+ public String getPublishString() {
+ return getCurrentMetricState();
+ }
+
+ @Override
+ public CombinableMetric combine(CombinableMetric metric) {
+ checkArgument(this.getClass().equals(metric.getClass()));
+ this.metric.change(metric.getValue());
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
deleted file mode 100644
index 141294d..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Incrementing only counter that keeps track of the
- * number of occurrences of something.
- *
- */
-@ThreadSafe
-class Counter implements Metric {
-
- private final AtomicLong counter;
- private final String name;
- private final String description;
-
- public Counter(String name, String description) {
- this.name = name;
- this.description = description;
- this.counter = new AtomicLong(0);
- }
-
- public long increment() {
- return counter.incrementAndGet();
- }
-
- public long getCurrentCount() {
- return counter.get();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public void reset() {
- counter.set(0);
- }
-
- @Override
- public String toString() {
- return "Name: " + name + ", Current count: " + counter.get();
- }
-
- @Override
- public String getCurrentMetricState() {
- return toString();
- }
-
- @Override
- public long getNumberOfSamples() {
- return getCurrentCount();
- }
-
- @Override
- public long getTotalSum() {
- return getCurrentCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
new file mode 100644
index 0000000..a8f3bb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Central place where we keep track of all the global client phoenix metrics. These metrics are different from
+ * {@link ReadMetricQueue} or {@link MutationMetricQueue} as they are collected at the client JVM level as opposed
+ * to the above two which are collected for every phoenix request.
+ */
+
+public enum GlobalClientMetrics {
+
+ GLOBAL_MUTATION_BATCH_SIZE(MUTATION_BATCH_SIZE),
+ GLOBAL_MUTATION_BYTES(MUTATION_BYTES),
+ GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
+ GLOBAL_QUERY_TIME(QUERY_TIME),
+ GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
+ GLOBAL_SCAN_BYTES(SCAN_BYTES),
+ GLOBAL_SPOOL_FILE_SIZE(SPOOL_FILE_SIZE),
+ GLOBAL_MEMORY_CHUNK_BYTES(MEMORY_CHUNK_BYTES),
+ GLOBAL_MEMORY_WAIT_TIME(MEMORY_WAIT_TIME),
+ GLOBAL_TASK_QUEUE_WAIT_TIME(TASK_QUEUE_WAIT_TIME),
+ GLOBAL_TASK_END_TO_END_TIME(TASK_END_TO_END_TIME),
+ GLOBAL_TASK_EXECUTION_TIME(TASK_EXECUTION_TIME),
+ GLOBAL_MUTATION_SQL_COUNTER(MUTATION_SQL_COUNTER),
+ GLOBAL_SELECT_SQL_COUNTER(SELECT_SQL_COUNTER),
+ GLOBAL_TASK_EXECUTED_COUNTER(TASK_EXECUTED_COUNTER),
+ GLOBAL_REJECTED_TASK_COUNTER(TASK_REJECTED_COUNTER),
+ GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
+ GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
+ GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER);
+
+ private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
+ private GlobalMetric metric;
+
+ public void update(long value) {
+ if (isGlobalMetricsEnabled) {
+ metric.change(value);
+ }
+ }
+
+ @VisibleForTesting
+ public GlobalMetric getMetric() {
+ return metric;
+ }
+
+ @Override
+ public String toString() {
+ return metric.toString();
+ }
+
+ private GlobalClientMetrics(MetricType metricType) {
+ this.metric = new GlobalMetricImpl(metricType);
+ }
+
+ public void increment() {
+ if (isGlobalMetricsEnabled) {
+ metric.increment();
+ }
+ }
+
+ public static Collection<GlobalMetric> getMetrics() {
+ List<GlobalMetric> metrics = new ArrayList<>();
+ for (GlobalClientMetrics m : GlobalClientMetrics.values()) {
+ metrics.add(m.metric);
+ }
+ return metrics;
+ }
+
+ public static boolean isMetricsEnabled() {
+ return isGlobalMetricsEnabled;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
new file mode 100644
index 0000000..f3b562f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Class that exposes the various internal phoenix metrics collected
+ * at the JVM level. Because metrics are dynamic in nature, it is not guaranteed that the
+ * state exposed will always be in sync with each other. One should use
+ * these metrics primarily for monitoring and debugging purposes.
+ */
+public interface GlobalMetric extends Metric {
+
+ /**
+ * @return Number of samples collected since the last {@link #reset()} call.
+ */
+ public long getNumberOfSamples();
+
+ /**
+ * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+ */
+ public long getTotalSum();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
new file mode 100644
index 0000000..26a16e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GlobalMetricImpl implements GlobalMetric {
+
+ private AtomicLong numberOfSamples = new AtomicLong(0);
+ private Metric metric;
+
+ public GlobalMetricImpl(MetricType type) {
+ this.metric = new AtomicMetric(type);
+ }
+
+ /**
+ * Reset the internal state. Typically called after metric information has been collected and a new phase of
+ * collection is being requested for the next interval.
+ */
+ @Override
+ public void reset() {
+ metric.reset();
+ numberOfSamples.set(0);
+ }
+
+ @Override
+ public long getNumberOfSamples() {
+ return numberOfSamples.get();
+ }
+
+ @Override
+ public long getTotalSum() {
+ return metric.getValue();
+ }
+
+ @Override
+ public void change(long delta) {
+ metric.change(delta);
+ numberOfSamples.incrementAndGet();
+ }
+
+ @Override
+ public void increment() {
+ metric.increment();
+ numberOfSamples.incrementAndGet();
+ }
+
+ @Override
+ public String getName() {
+ return metric.getName();
+ }
+
+ @Override
+ public String getDescription() {
+ return metric.getDescription();
+ }
+
+ @Override
+ public long getValue() {
+ return metric.getValue();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return metric.getCurrentMetricState() + ", Number of samples: " + numberOfSamples.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
new file mode 100644
index 0000000..0e82ce4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+
+/**
+ * Class that encapsulates the metrics regarding memory resources needed for servicing a request.
+ */
+public class MemoryMetricsHolder {
+ private final CombinableMetric memoryChunkSizeMetric;
+ private final CombinableMetric memoryWaitTimeMetric;
+ public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
+
+ public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);
+ this.memoryWaitTimeMetric = readMetrics.allotMetric(MEMORY_WAIT_TIME, tableName);
+ }
+
+ public CombinableMetric getMemoryChunkSizeMetric() {
+ return memoryChunkSizeMetric;
+ }
+
+ public CombinableMetric getMemoryWaitTimeMetric() {
+ return memoryWaitTimeMetric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index aef792c..1ad1c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -18,47 +18,46 @@
package org.apache.phoenix.monitoring;
/**
- * Interface that exposes the various internal phoenix metrics collected.
- * Because metrics are dynamic in nature, it is not guaranteed that the
- * state exposed will always be in sync with each other. One should use
- * these metrics primarily for monitoring and debugging purposes.
+ * Interface that represents phoenix-internal metric.
*/
public interface Metric {
-
/**
- *
* @return Name of the metric
*/
public String getName();
-
+
/**
- *
* @return Description of the metric
*/
public String getDescription();
-
+
/**
- * Reset the internal state. Typically called after
- * metric information has been collected and a new
- * phase of collection is being requested for the next
- * interval.
+ * @return Current value of the metric
*/
- public void reset();
-
+ public long getValue();
+
/**
+ * Change the metric by the specified amount
*
- * @return String that represents the current state of the metric.
- * Typically used to log the current state.
+ * @param delta
+ * amount by which the metric value should be changed
*/
- public String getCurrentMetricState();
-
+ public void change(long delta);
+
+ /**
+ * Change the value of metric by 1
+ */
+ public void increment();
+
/**
- * @return Number of samples collected since the last {@link #reset()} call.
+ * @return String that represents the current state of the metric. Typically used for logging or reporting purposes.
*/
- public long getNumberOfSamples();
+ public String getCurrentMetricState();
/**
- * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+ * Reset the metric
*/
- public long getTotalSum();
+ public void reset();
+
}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
new file mode 100644
index 0000000..a0c2a4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+public enum MetricType {
+
+ MUTATION_BATCH_SIZE("Batch sizes of mutations"),
+ MUTATION_BYTES("Size of mutations in bytes"),
+ MUTATION_COMMIT_TIME("Time it took to commit mutations"),
+ QUERY_TIME("Query times"),
+ NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"),
+ SCAN_BYTES("Number of bytes read by scans"),
+ MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"),
+ MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+ MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"),
+ SELECT_SQL_COUNTER("Counter for number of sql queries"),
+ TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+ TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"),
+ TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"),
+ TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"),
+ TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"),
+ QUERY_TIMEOUT_COUNTER("Number of times query timed out"),
+ QUERY_FAILED_COUNTER("Number of times query failed"),
+ SPOOL_FILE_SIZE("Size of spool files created in bytes"),
+ SPOOL_FILE_COUNTER("Number of spool files created"),
+ CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
+ WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
+ RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()");
+
+ private final String description;
+
+ private MetricType(String description) {
+ this.description = description;
+ }
+
+ public String description() {
+ return description;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
new file mode 100644
index 0000000..bffb9ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ *
+ * Stop watch that is cognizant of the fact whether or not metrics is enabled.
+ * If metrics isn't enabled it doesn't do anything. Otherwise, it delegates
+ * calls to a {@code Stopwatch}.
+ *
+ */
+final class MetricsStopWatch {
+
+ private final boolean isMetricsEnabled;
+ private final Stopwatch stopwatch;
+
+ MetricsStopWatch(boolean isMetricsEnabled) {
+ this.isMetricsEnabled = isMetricsEnabled;
+ this.stopwatch = new Stopwatch();
+ }
+
+ void start() {
+ if (isMetricsEnabled) {
+ stopwatch.start();
+ }
+ }
+
+ void stop() {
+ if (isMetricsEnabled) {
+ if (stopwatch.isRunning()) {
+ stopwatch.stop();
+ }
+ }
+ }
+
+ long getElapsedTimeInMs() {
+ if (isMetricsEnabled) {
+ return stopwatch.elapsedMillis();
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
new file mode 100644
index 0000000..e90da46
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Queue that tracks various writes/mutations related phoenix request metrics.
+ */
+public class MutationMetricQueue {
+
+ // Map of table name -> mutation metric
+ private Map<String, MutationMetric> tableMutationMetric = new HashMap<>();
+
+ public void addMetricsForTable(String tableName, MutationMetric metric) {
+ MutationMetric tableMetric = tableMutationMetric.get(tableName);
+ if (tableMetric == null) {
+ tableMutationMetric.put(tableName, metric);
+ } else {
+ tableMetric.combineMetric(metric);
+ }
+ }
+
+ public void combineMetricQueues(MutationMetricQueue other) {
+ Map<String, MutationMetric> tableMetricMap = other.tableMutationMetric;
+ for (Entry<String, MutationMetric> entry : tableMetricMap.entrySet()) {
+ addMetricsForTable(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish.
+ * @return map of table name -> list of pair of (metric name, metric value)
+ */
+ public Map<String, Map<String, Long>> aggregate() {
+ Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+ for (Entry<String, MutationMetric> entry : tableMutationMetric.entrySet()) {
+ String tableName = entry.getKey();
+ MutationMetric metric = entry.getValue();
+ Map<String, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
+ if (publishedMetricsForTable == null) {
+ publishedMetricsForTable = new HashMap<>();
+ publishedMetrics.put(tableName, publishedMetricsForTable);
+ }
+ publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue());
+ publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue());
+ publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue());
+ }
+ return publishedMetrics;
+ }
+
+ public void clearMetrics() {
+ tableMutationMetric.clear(); // help gc
+ }
+
+ /**
+ * Class that holds together the various metrics associated with mutations.
+ */
+ public static class MutationMetric {
+ private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
+ private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
+ private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
+
+ public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) {
+ this.numMutations.change(numMutations);
+ this.mutationsSizeBytes.change(mutationsSizeBytes);
+ this.totalCommitTimeForMutations.change(commitTimeForMutations);
+ }
+
+ public CombinableMetric getCommitTimeForMutations() {
+ return totalCommitTimeForMutations;
+ }
+
+ public CombinableMetric getNumMutations() {
+ return numMutations;
+ }
+
+ public CombinableMetric getMutationsSizeBytes() {
+ return mutationsSizeBytes;
+ }
+
+ public void combineMetric(MutationMetric other) {
+ this.numMutations.combine(other.numMutations);
+ this.mutationsSizeBytes.combine(other.mutationsSizeBytes);
+ this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
+ }
+
+ }
+
+ /**
+ * Class to represent a no-op mutation metric. Used in places where request level metric tracking for mutations is not
+ * needed or desired.
+ */
+ public static class NoOpMutationMetricsQueue extends MutationMetricQueue {
+
+ public static final NoOpMutationMetricsQueue NO_OP_MUTATION_METRICS_QUEUE = new NoOpMutationMetricsQueue();
+
+ private NoOpMutationMetricsQueue() {}
+
+ @Override
+ public void addMetricsForTable(String tableName, MutationMetric metric) {}
+
+ @Override
+ public Map<String, Map<String, Long>> aggregate() { return Collections.emptyMap(); }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
new file mode 100644
index 0000000..2d92116
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Version of {@link Metric} that can be used when the metric isn't getting concurrently modified/accessed by multiple
+ * threads and the memory consistency effects of happen-before can be established. For example - phoenix client side
+ * metrics are modified/accessed by only one thread at a time. Further, the actions of threads in the phoenix client
+ * thread pool happen-before the actions of the thread that performs the aggregation of metrics. This makes
+ * {@link NonAtomicMetric} a good fit for storing Phoenix's client side request level metrics.
+ */
+class NonAtomicMetric implements Metric {
+
+ private final MetricType type;
+ private long value;
+
+ public NonAtomicMetric(MetricType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getName() {
+ return type.name();
+ }
+
+ @Override
+ public String getDescription() {
+ return type.description();
+ }
+
+ @Override
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public void change(long delta) {
+ value += delta;
+ }
+
+ @Override
+ public void increment() {
+ value++;
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return getName() + ": " + value;
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
new file mode 100644
index 0000000..1f71542
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.CACHE_REFRESH_SPLITS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+/**
+ * Class that represents the overall metrics associated with a query being executed by the phoenix.
+ */
+public class OverAllQueryMetrics {
+ private final MetricsStopWatch queryWatch;
+ private final MetricsStopWatch resultSetWatch;
+ private final CombinableMetric numParallelScans;
+ private final CombinableMetric wallClockTimeMS;
+ private final CombinableMetric resultSetTimeMS;
+ private final CombinableMetric queryTimedOut;
+ private final CombinableMetric queryFailed;
+ private final CombinableMetric cacheRefreshedDueToSplits;
+
+ public OverAllQueryMetrics(boolean isMetricsEnabled) {
+ queryWatch = new MetricsStopWatch(isMetricsEnabled);
+ resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
+ numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
+ wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
+ resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
+ queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
+ queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
+ cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
+ : NoOpRequestMetric.INSTANCE;
+ }
+
+ public void updateNumParallelScans(long numParallelScans) {
+ this.numParallelScans.change(numParallelScans);
+ }
+
+ public void queryTimedOut() {
+ queryTimedOut.increment();
+ }
+
+ public void queryFailed() {
+ queryFailed.increment();
+ }
+
+ public void cacheRefreshedDueToSplits() {
+ cacheRefreshedDueToSplits.increment();
+ }
+
+ public void startQuery() {
+ queryWatch.start();
+ }
+
+ public void endQuery() {
+ queryWatch.stop();
+ wallClockTimeMS.change(queryWatch.getElapsedTimeInMs());
+ }
+
+ public void startResultSetWatch() {
+ resultSetWatch.start();
+ }
+
+ public void stopResultSetWatch() {
+ resultSetWatch.stop();
+ resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs());
+ }
+
+ public Map<String, Long> publish() {
+ Map<String, Long> metricsForPublish = new HashMap<>();
+ metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue());
+ metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue());
+ metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue());
+ metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue());
+ metricsForPublish.put(queryFailed.getName(), queryFailed.getValue());
+ metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue());
+ return metricsForPublish;
+ }
+
+ public void reset() {
+ numParallelScans.reset();
+ wallClockTimeMS.reset();
+ resultSetTimeMS.reset();
+ queryTimedOut.reset();
+ queryFailed.reset();
+ cacheRefreshedDueToSplits.reset();
+ queryWatch.stop();
+ resultSetWatch.stop();
+ }
+
+ public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
+ cacheRefreshedDueToSplits.combine(metric.cacheRefreshedDueToSplits);
+ queryFailed.combine(metric.queryFailed);
+ queryTimedOut.combine(metric.queryTimedOut);
+ numParallelScans.combine(metric.numParallelScans);
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
deleted file mode 100644
index 28e2f2e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-/**
- * Central place where we keep track of all the internal
- * phoenix metrics that we track.
- *
- */
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.phoenix.query.QueryServicesOptions;
-
-public class PhoenixMetrics {
- private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled();
-
- public static boolean isMetricsEnabled() {
- return isMetricsEnabled;
- }
-
- public enum SizeMetric {
- MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"),
- MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"),
- MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"),
- QUERY_TIME("QueryTime", "Cumulative query times"),
- PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"),
- SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"),
- SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"),
- MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"),
- MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
- TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"),
- TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"),
- TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute");
-
- private final SizeStatistic metric;
-
- private SizeMetric(String metricName, String metricDescription) {
- metric = new SizeStatistic(metricName, metricDescription);
- }
-
- public void update(long value) {
- if (isMetricsEnabled) {
- metric.add(value);
- }
- }
-
- // exposed for testing.
- public Metric getMetric() {
- return metric;
- }
-
- @Override
- public String toString() {
- return metric.toString();
- }
- }
-
- public enum CountMetric {
- MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"),
- QUERY_COUNT("NumQueryCounter", "Counter for number of queries"),
- TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"),
- REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"),
- QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"),
- FAILED_QUERY("QueryFailureCounter", "Number of times query failed"),
- NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created");
-
- private final Counter metric;
-
- private CountMetric(String metricName, String metricDescription) {
- metric = new Counter(metricName, metricDescription);
- }
-
- public void increment() {
- if (isMetricsEnabled) {
- metric.increment();
- }
- }
-
- // exposed for testing.
- public Metric getMetric() {
- return metric;
- }
-
- @Override
- public String toString() {
- return metric.toString();
- }
- }
-
- public static Collection<Metric> getMetrics() {
- List<Metric> metrics = new ArrayList<>();
- for (SizeMetric s : SizeMetric.values()) {
- metrics.add(s.metric);
- }
- for (CountMetric s : CountMetric.values()) {
- metrics.add(s.metric);
- }
- return metrics;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
new file mode 100644
index 0000000..e6c6be2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nonnull;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Queue of all metrics associated with performing reads from the cluster.
+ */
+public class ReadMetricQueue {
+
+ private static final int MAX_QUEUE_SIZE = 20000; // TODO: should this be configurable?
+
+ private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>();
+
+ private final boolean isRequestMetricsEnabled;
+
+ public ReadMetricQueue(boolean isRequestMetricsEnabled) {
+ this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+ }
+
+ public CombinableMetric allotMetric(MetricType type, String tableName) {
+ if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
+ MetricKey key = new MetricKey(type, tableName);
+ Queue<CombinableMetric> q = getMetricQueue(key);
+ CombinableMetric metric = getMetric(type);
+ q.offer(metric);
+ return metric;
+ }
+
+ @VisibleForTesting
+ public CombinableMetric getMetric(MetricType type) {
+ CombinableMetric metric = new CombinableMetricImpl(type);
+ return metric;
+ }
+
+ /**
+ * @return map of table name -> list of pair of (metric name, metric value)
+ */
+ public Map<String, Map<String, Long>> aggregate() {
+ Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+ for (Entry<MetricKey, Queue<CombinableMetric>> entry : metricsMap.entrySet()) {
+ String tableNameToPublish = entry.getKey().tableName;
+ Collection<CombinableMetric> metrics = entry.getValue();
+ if (metrics.size() > 0) {
+ CombinableMetric m = combine(metrics);
+ Map<String, Long> map = publishedMetrics.get(tableNameToPublish);
+ if (map == null) {
+ map = new HashMap<>();
+ publishedMetrics.put(tableNameToPublish, map);
+ }
+ map.put(m.getName(), m.getValue());
+ }
+ }
+ return publishedMetrics;
+ }
+
+ public void clearMetrics() {
+ metricsMap.clear(); // help gc
+ }
+
+ private static CombinableMetric combine(Collection<CombinableMetric> metrics) {
+ int size = metrics.size();
+ if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); }
+ Iterator<CombinableMetric> itr = metrics.iterator();
+ CombinableMetric combinedMetric = itr.next();
+ while (itr.hasNext()) {
+ combinedMetric = combinedMetric.combine(itr.next());
+ }
+ return combinedMetric;
+ }
+
+ /**
+ * Combine the metrics. This method should only be called in a single threaded manner when the two metric holders
+ * are not getting modified.
+ */
+ public ReadMetricQueue combineReadMetrics(ReadMetricQueue other) {
+ ConcurrentMap<MetricKey, Queue<CombinableMetric>> otherMetricsMap = other.metricsMap;
+ for (Entry<MetricKey, Queue<CombinableMetric>> entry : otherMetricsMap.entrySet()) {
+ MetricKey key = entry.getKey();
+ Queue<CombinableMetric> otherQueue = entry.getValue();
+ CombinableMetric combinedMetric = null;
+ // combine the metrics corresponding to this metric key before putting it in the queue.
+ for (CombinableMetric m : otherQueue) {
+ if (combinedMetric == null) {
+ combinedMetric = m;
+ } else {
+ combinedMetric.combine(m);
+ }
+ }
+ if (combinedMetric != null) {
+ Queue<CombinableMetric> thisQueue = getMetricQueue(key);
+ thisQueue.offer(combinedMetric);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Inner class whose instances are used as keys in the metrics map.
+ */
+ private static class MetricKey {
+ @Nonnull
+ private final MetricType type;
+
+ @Nonnull
+ private final String tableName;
+
+ MetricKey(MetricType type, String tableName) {
+ checkNotNull(type);
+ checkNotNull(tableName);
+ this.type = type;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + tableName.hashCode();
+ result = prime * result + type.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ MetricKey other = (MetricKey)obj;
+ if (tableName.equals(other.tableName) && type == other.type) return true;
+ return false;
+ }
+
+ }
+
+ private Queue<CombinableMetric> getMetricQueue(MetricKey key) {
+ Queue<CombinableMetric> q = metricsMap.get(key);
+ if (q == null) {
+ q = new LinkedBlockingQueue<CombinableMetric>(MAX_QUEUE_SIZE);
+ Queue<CombinableMetric> curQ = metricsMap.putIfAbsent(key, q);
+ if (curQ != null) {
+ q = curQ;
+ }
+ }
+ return q;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
deleted file mode 100644
index 9eca754..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- * Statistic that keeps track of the sum of long values that
- * could be used to represent a phoenix metric. For performance
- * reasons the internal state in this metric is not strictly covariant
- * and hence should only be used for monitoring and debugging purposes.
- */
-class SizeStatistic implements Metric {
-
- private final AtomicLong total = new AtomicLong(0);
- private final AtomicLong numSamples = new AtomicLong(0);
- private final String name;
- private final String description;
-
- public SizeStatistic(String name, String description) {
- this.name = name;
- this.description = description;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public void reset() {
- total.set(0);
- numSamples.set(0);
- }
-
- @Override
- public String getCurrentMetricState() {
- return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get();
- }
-
- @Override
- public long getNumberOfSamples() {
- return numSamples.get();
- }
-
- @Override
- public long getTotalSum() {
- return total.get();
- }
-
- public long add(long value) {
- // there is a race condition here but what the heck.
- numSamples.incrementAndGet();
- return total.addAndGet(value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
new file mode 100644
index 0000000..4373887
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+/**
+ * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
+ * request.
+ */
+public class SpoolingMetricsHolder {
+
+ private final CombinableMetric spoolFileSizeMetric;
+ private final CombinableMetric numSpoolFileMetric;
+ public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+
+ public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
+ this.numSpoolFileMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_COUNTER, tableName);
+ }
+
+ public CombinableMetric getSpoolFileSizeMetric() {
+ return spoolFileSizeMetric;
+ }
+
+ public CombinableMetric getNumSpoolFileMetric() {
+ return numSpoolFileMetric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
new file mode 100644
index 0000000..98ff57c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
+
+/**
+ * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
+ * thread pool.
+ */
+public class TaskExecutionMetricsHolder {
+
+ private final CombinableMetric taskQueueWaitTime;
+ private final CombinableMetric taskEndToEndTime;
+ private final CombinableMetric taskExecutionTime;
+ private final CombinableMetric numTasks;
+ private final CombinableMetric numRejectedTasks;
+ public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+
+ public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
+ taskEndToEndTime = readMetrics.allotMetric(TASK_END_TO_END_TIME, tableName);
+ taskExecutionTime = readMetrics.allotMetric(TASK_EXECUTION_TIME, tableName);
+ numTasks = readMetrics.allotMetric(TASK_EXECUTED_COUNTER, tableName);
+ numRejectedTasks = readMetrics.allotMetric(TASK_REJECTED_COUNTER, tableName);
+ }
+
+ public CombinableMetric getTaskQueueWaitTime() {
+ return taskQueueWaitTime;
+ }
+
+ public CombinableMetric getTaskEndToEndTime() {
+ return taskEndToEndTime;
+ }
+
+ public CombinableMetric getTaskExecutionTime() {
+ return taskExecutionTime;
+ }
+
+ public CombinableMetric getNumTasks() {
+ return numTasks;
+ }
+
+ public CombinableMetric getNumRejectedTasks() {
+ return numRejectedTasks;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 898a919..c16b86d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -45,7 +45,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
options.getKeepAliveMs(),
options.getThreadPoolSize(),
options.getQueueSize(),
- options.isMetricsEnabled());
+ options.isGlobalMetricsEnabled());
this.memoryManager = new GlobalMemoryManager(
Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
options.getMaxMemoryWaitMs());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 3e7d084..825cc83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -157,7 +157,7 @@ public interface QueryServices extends SQLCloseable {
public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
- public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
+ public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
// rpc queue configs
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
@@ -165,6 +165,7 @@ public interface QueryServices extends SQLCloseable {
public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
+ public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 02c695e..4e8879b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,15 +18,16 @@
package org.apache.phoenix.query;
import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
-import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -43,7 +44,6 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -187,13 +187,14 @@ public class QueryServicesOptions {
// TODO Change this to true as part of PHOENIX-1543
public static final boolean DEFAULT_AUTO_COMMIT = false;
- public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
+ public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
+ public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
private final Configuration config;
@@ -246,10 +247,11 @@ public class QueryServicesOptions {
.setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
.setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
.setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
- .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+ .setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED)
.setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
.setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
- .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
+ .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+ .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
@@ -445,10 +447,10 @@ public class QueryServicesOptions {
return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
}
- public boolean isMetricsEnabled() {
- return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+ public boolean isGlobalMetricsEnabled() {
+ return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED);
}
-
+
public boolean isUseByteBasedRegex() {
return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
}
@@ -526,11 +528,7 @@ public class QueryServicesOptions {
return this;
}
- public QueryServicesOptions setMetricsEnabled(boolean flag) {
- config.setBoolean(METRICS_ENABLED, flag);
- return this;
- }
-
+
public QueryServicesOptions setUseByteBasedRegex(boolean flag) {
config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
return this;
@@ -540,4 +538,5 @@ public class QueryServicesOptions {
config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
return this;
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..159e0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
*/
package org.apache.phoenix.trace;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,15 @@ import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.QueryUtil;
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
/**
* Write the metrics to a phoenix table.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 06534d1..cbe5a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -154,4 +154,9 @@ public class JDBCUtil {
}
return Boolean.valueOf(autoCommit);
}
+
+ public static boolean isCollectingRequestLevelMetricsEnabled(String url, Properties overrideProps, ReadOnlyProps queryServicesProps) throws SQLException {
+ String batchSizeStr = findProperty(url, overrideProps, PhoenixRuntime.REQUEST_METRIC_ATTRIB);
+ return (batchSizeStr == null ? queryServicesProps.getBoolean(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, QueryServicesOptions.DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) : Boolean.parseBoolean(batchSizeStr));
+ }
}