You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/03 18:30:49 UTC

[5/7] calcite git commit: [CALCITE-1119] Additional metrics instrumentation for request processing

[CALCITE-1119] Additional metrics instrumentation for request processing


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aecefef8
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aecefef8
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aecefef8

Branch: refs/heads/master
Commit: aecefef8a4420e780737a8f49b72fd146679f2e2
Parents: 1d3a26d
Author: Josh Elser <el...@apache.org>
Authored: Wed Mar 2 17:43:46 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Mar 2 23:26:08 2016 -0500

----------------------------------------------------------------------
 .../calcite/avatica/remote/LocalService.java    | 139 ++++++++++++-------
 1 file changed, 89 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/aecefef8/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 11d15c9..c070ec0 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -17,25 +17,54 @@
 package org.apache.calcite.avatica.remote;
 
 import org.apache.calcite.avatica.Meta;
+
 import org.apache.calcite.avatica.MetaImpl;
 import org.apache.calcite.avatica.MissingResultsException;
 import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
 /**
  * Implementation of {@link Service} that talks to a local {@link Meta}.
  */
 public class LocalService implements Service {
   final Meta meta;
+  final MetricsSystem metrics;
+
+  private final Timer executeTimer;
+  private final Timer commitTimer;
+  private final Timer prepareTimer;
+  private final Timer prepareAndExecuteTimer;
+  private final Timer connectionSyncTimer;
 
   private RpcMetadataResponse serverLevelRpcMetadata;
 
   public LocalService(Meta meta) {
+    this(meta, NoopMetricsSystem.getInstance());
+  }
+
+  public LocalService(Meta meta, MetricsSystem metrics) {
     this.meta = meta;
+    this.metrics = Objects.requireNonNull(metrics);
+
+    this.executeTimer = this.metrics.getTimer(name("Execute"));
+    this.commitTimer = this.metrics.getTimer(name("Commit"));
+    this.prepareTimer = this.metrics.getTimer(name("Prepare"));
+    this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute"));
+    this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync"));
+  }
+
+  private static String name(String timer) {
+    return concat(LocalService.class, timer);
   }
 
   @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) {
@@ -172,42 +201,46 @@ public class LocalService implements Service {
   }
 
   public PrepareResponse apply(PrepareRequest request) {
-    final Meta.ConnectionHandle ch =
-        new Meta.ConnectionHandle(request.connectionId);
-    final Meta.StatementHandle h =
-        meta.prepare(ch, request.sql, request.maxRowCount);
-    return new PrepareResponse(h, serverLevelRpcMetadata);
+    try (final Context ctx = prepareTimer.start()) {
+      final Meta.ConnectionHandle ch =
+          new Meta.ConnectionHandle(request.connectionId);
+      final Meta.StatementHandle h =
+          meta.prepare(ch, request.sql, request.maxRowCount);
+      return new PrepareResponse(h, serverLevelRpcMetadata);
+    }
   }
 
   public ExecuteResponse apply(PrepareAndExecuteRequest request) {
-    final Meta.StatementHandle sh =
-        new Meta.StatementHandle(request.connectionId, request.statementId, null);
-    try {
-      final Meta.ExecuteResult executeResult =
-          meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
-              new Meta.PrepareCallback() {
-                @Override public Object getMonitor() {
-                  return LocalService.class;
-                }
-
-                @Override public void clear() {
-                }
-
-                @Override public void assign(Meta.Signature signature,
-                    Meta.Frame firstFrame, long updateCount) {
-                }
-
-                @Override public void execute() {
-                }
-              });
-      final List<ResultSetResponse> results = new ArrayList<>();
-      for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
-        results.add(toResponse(metaResultSet));
+    try (final Context ctx = prepareAndExecuteTimer.start()) {
+      final Meta.StatementHandle sh =
+          new Meta.StatementHandle(request.connectionId, request.statementId, null);
+      try {
+        final Meta.ExecuteResult executeResult =
+            meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
+                new Meta.PrepareCallback() {
+                  @Override public Object getMonitor() {
+                    return LocalService.class;
+                  }
+
+                  @Override public void clear() {
+                  }
+
+                  @Override public void assign(Meta.Signature signature,
+                      Meta.Frame firstFrame, long updateCount) {
+                  }
+
+                  @Override public void execute() {
+                  }
+                });
+        final List<ResultSetResponse> results = new ArrayList<>();
+        for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+          results.add(toResponse(metaResultSet));
+        }
+        return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+      } catch (NoSuchStatementException e) {
+        // The Statement doesn't exist anymore, bubble up this information
+        return new ExecuteResponse(null, true, serverLevelRpcMetadata);
       }
-      return new ExecuteResponse(results, false, serverLevelRpcMetadata);
-    } catch (NoSuchStatementException e) {
-      // The Statement doesn't exist anymore, bubble up this information
-      return new ExecuteResponse(null, true, serverLevelRpcMetadata);
     }
   }
 
@@ -229,17 +262,19 @@ public class LocalService implements Service {
   }
 
   public ExecuteResponse apply(ExecuteRequest request) {
-    try {
-      final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle,
-          request.parameterValues, request.maxRowCount);
-
-      final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size());
-      for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
-        results.add(toResponse(metaResultSet));
+    try (final Context ctx = executeTimer.start()) {
+      try {
+        final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle,
+            request.parameterValues, request.maxRowCount);
+
+        final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size());
+        for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+          results.add(toResponse(metaResultSet));
+        }
+        return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+      } catch (NoSuchStatementException e) {
+        return new ExecuteResponse(null, true, serverLevelRpcMetadata);
       }
-      return new ExecuteResponse(results, false, serverLevelRpcMetadata);
-    } catch (NoSuchStatementException e) {
-      return new ExecuteResponse(null, true, serverLevelRpcMetadata);
     }
   }
 
@@ -272,11 +307,13 @@ public class LocalService implements Service {
   }
 
   public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
-    final Meta.ConnectionHandle ch =
-        new Meta.ConnectionHandle(request.connectionId);
-    final Meta.ConnectionProperties connProps =
-        meta.connectionSync(ch, request.connProps);
-    return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata);
+    try (final Context ctx = connectionSyncTimer.start()) {
+      final Meta.ConnectionHandle ch =
+          new Meta.ConnectionHandle(request.connectionId);
+      final Meta.ConnectionProperties connProps =
+          meta.connectionSync(ch, request.connProps);
+      return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata);
+    }
   }
 
   public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
@@ -302,10 +339,12 @@ public class LocalService implements Service {
   }
 
   public CommitResponse apply(CommitRequest request) {
-    meta.commit(new Meta.ConnectionHandle(request.connectionId));
+    try (final Context ctx = commitTimer.start()) {
+      meta.commit(new Meta.ConnectionHandle(request.connectionId));
 
-    // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception.
-    return new CommitResponse();
+      // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception.
+      return new CommitResponse();
+    }
   }
 
   public RollbackResponse apply(RollbackRequest request) {