You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/24 19:49:24 UTC

[2/3] calcite git commit: [CALCITE-1159] Kerberos-based client authentication via SPNEGO

[CALCITE-1159] Kerberos-based client authentication via SPNEGO

A large set of changes, spanning both new APIs for creating
the Avatica HTTP server and new options for clients to use
the new authentication mechanism. Tests are included that
verify end-to-end SPNEGO-based authentication with Kerberos.

Parallel test running has been removed as it has been
problematic with both HSQLDB and now Kerby (the in-memory
KDC implementation).

Docs have also been updated to include details on how downstream
users should create servers and clients as well as RPC-level details.

Closes apache/calcite#214


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/406372f1
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/406372f1
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/406372f1

Branch: refs/heads/master
Commit: 406372f1274a6a0c9fe2b471ce6d65669e798633
Parents: 5dfa3f1
Author: Josh Elser <el...@apache.org>
Authored: Mon Mar 14 10:55:11 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 24 14:28:32 2016 -0400

----------------------------------------------------------------------
 .../avatica/BuiltInConnectionProperty.java      |   3 +
 .../calcite/avatica/ConnectionConfig.java       |   2 +
 .../calcite/avatica/ConnectionConfigImpl.java   |   4 +
 .../remote/AvaticaCommonsHttpClientImpl.java    |  23 +-
 .../AvaticaCommonsHttpClientSpnegoImpl.java     | 180 +++++++++
 .../remote/AvaticaHttpClientFactoryImpl.java    |   9 +-
 .../avatica/remote/ProtobufTranslationImpl.java |  32 +-
 .../avatica/remote/RemoteProtobufService.java   |  17 +-
 .../apache/calcite/avatica/remote/Service.java  |   5 +-
 avatica/pom.xml                                 |  24 +-
 avatica/server/pom.xml                          |  20 +-
 .../avatica/server/AbstractAvaticaHandler.java  |  70 ++++
 .../avatica/server/AuthenticationType.java      |  27 ++
 .../avatica/server/AvaticaJsonHandler.java      |  42 +-
 .../avatica/server/AvaticaProtobufHandler.java  |  39 +-
 .../server/AvaticaServerConfiguration.java      |  67 ++++
 .../avatica/server/DoAsRemoteUserCallback.java  |  42 ++
 .../calcite/avatica/server/HandlerFactory.java  |  49 ++-
 .../calcite/avatica/server/HttpServer.java      | 389 ++++++++++++++++++-
 .../server/PropertyBasedSpnegoLoginService.java |  50 +++
 .../calcite/avatica/AvaticaSpnegoTest.java      | 227 +++++++++++
 .../apache/calcite/avatica/SpnegoTestUtil.java  | 208 ++++++++++
 .../server/AbstractAvaticaHandlerTest.java      |  99 +++++
 .../server/HttpServerSpnegoWithJaasTest.java    | 227 +++++++++++
 .../server/HttpServerSpnegoWithoutJaasTest.java | 218 +++++++++++
 .../server/src/test/resources/log4j.properties  |   4 +
 avatica/site/_data/docs.yml                     |   2 +
 avatica/site/_docs/client_reference.md          | 108 +++++
 avatica/site/_docs/security.md                  | 145 +++++++
 29 files changed, 2287 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/avatica/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
index 086ae4a..3562f25 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
@@ -44,6 +44,9 @@ public enum BuiltInConnectionProperty implements ConnectionProperty {
   /** Serialization used over remote connections */
   SERIALIZATION("serialization", Type.STRING, "json", false),
 
+  /** The type of authentication to be used */
+  AUTHENTICATION("authentication", Type.STRING, null, false),
+
   /** Factory for constructing http clients. */
   HTTP_CLIENT_FACTORY("httpclient_factory", Type.PLUGIN,
       AvaticaHttpClientFactoryImpl.class.getName(), false),

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
index 8e4790c..5a6324c 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
@@ -33,6 +33,8 @@ public interface ConnectionConfig {
   String url();
   /** @see BuiltInConnectionProperty#SERIALIZATION */
   String serialization();
+  /** @see BuiltInConnectionProperty#AUTHENTICATION */
+  String authentication();
   AvaticaHttpClientFactory httpClientFactory();
   String httpClientClass();
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
index bbeefea..cd0325d 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
@@ -52,6 +52,10 @@ public class ConnectionConfigImpl implements ConnectionConfig {
     return BuiltInConnectionProperty.SERIALIZATION.wrap(properties).getString();
   }
 
+  public String authentication() {
+    return BuiltInConnectionProperty.AUTHENTICATION.wrap(properties).getString();
+  }
+
   public AvaticaHttpClientFactory httpClientFactory() {
     return BuiltInConnectionProperty.HTTP_CLIENT_FACTORY.wrap(properties)
         .getPlugin(AvaticaHttpClientFactory.class, null);

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
index 3bda248..9cd678e 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
@@ -20,6 +20,7 @@ import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpClientConnection;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
+import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.client.protocol.RequestExpectContinue;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.ContentType;
@@ -28,7 +29,6 @@ import org.apache.http.impl.pool.BasicConnFactory;
 import org.apache.http.impl.pool.BasicConnPool;
 import org.apache.http.impl.pool.BasicPoolEntry;
 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
-import org.apache.http.protocol.HttpCoreContext;
 import org.apache.http.protocol.HttpProcessor;
 import org.apache.http.protocol.HttpProcessorBuilder;
 import org.apache.http.protocol.HttpRequestExecutor;
@@ -94,8 +94,9 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient {
       BasicPoolEntry entry = null;
       try {
         entry = future.get();
-        HttpCoreContext coreContext = HttpCoreContext.create();
-        coreContext.setTargetHost(host);
+        HttpClientContext context = HttpClientContext.create();
+
+        context.setTargetHost(host);
 
         HttpClientConnection conn = entry.getConnection();
 
@@ -105,12 +106,12 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient {
             new BasicHttpEntityEnclosingRequest("POST", "/");
         postRequest.setEntity(entity);
 
-        httpExecutor.preProcess(postRequest, httpProcessor, coreContext);
-        HttpResponse response = httpExecutor.execute(postRequest, conn, coreContext);
-        httpExecutor.postProcess(response, httpProcessor, coreContext);
+        httpExecutor.preProcess(postRequest, httpProcessor, context);
+        HttpResponse response = httpExecutor.execute(postRequest, conn, context);
+        httpExecutor.postProcess(response, httpProcessor, context);
 
         // Should the connection be kept alive?
-        reusable = REUSE.keepAlive(response, coreContext);
+        reusable = REUSE.keepAlive(response, context);
 
         final int statusCode = response.getStatusLine().getStatusCode();
         if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
@@ -118,7 +119,13 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient {
           continue;
         }
 
-        return EntityUtils.toByteArray(response.getEntity());
+        // HTTP-200 and HTTP-500 should both contain Avatica messages.
+        if (HttpURLConnection.HTTP_OK == statusCode
+            || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
+          return EntityUtils.toByteArray(response.getEntity());
+        }
+
+        throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
       } catch (Exception e) {
         LOG.debug("Failed to execute HTTP request", e);
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientSpnegoImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientSpnegoImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientSpnegoImpl.java
new file mode 100644
index 0000000..c1ca658
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientSpnegoImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.KerberosCredentials;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.Principal;
+import java.util.Objects;
+
+/**
+ * Implementation of an AvaticaHttpClient which uses SPNEGO.
+ */
+public class AvaticaCommonsHttpClientSpnegoImpl implements AvaticaHttpClient {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AvaticaCommonsHttpClientSpnegoImpl.class);
+
+  public static final String CACHED_CONNECTIONS_MAX_KEY = "avatica.http.spnego.max_cached";
+  public static final String CACHED_CONNECTIONS_MAX_DEFAULT = "100";
+  public static final String CACHED_CONNECTIONS_MAX_PER_ROUTE_KEY =
+      "avatica.http.spnego.max_per_route";
+  public static final String CACHED_CONNECTIONS_MAX_PER_ROUTE_DEFAULT = "25";
+
+  private static final boolean USE_CANONICAL_HOSTNAME = true;
+  private static final boolean STRIP_PORT_ON_SERVER_LOOKUP = true;
+
+  final URL url;
+  final HttpHost host;
+  final PoolingHttpClientConnectionManager pool;
+  final Lookup<AuthSchemeProvider> authRegistry;
+  final BasicCredentialsProvider credentialsProvider;
+  final BasicAuthCache authCache;
+  final CloseableHttpClient client;
+
+  /**
+   * Constructs an http client with the expectation that the user is already logged in with their
+   * Kerberos identity via JAAS.
+   *
+   * @param url The URL for the Avatica server
+   */
+  public AvaticaCommonsHttpClientSpnegoImpl(URL url) {
+    this(url, null);
+  }
+
+  /**
+   * Constructs an HTTP client with user specified by the given credentials.
+   *
+   * @param url The URL for the Avatica server
+   * @param credential The GSS credentials
+   */
+  public AvaticaCommonsHttpClientSpnegoImpl(URL url, GSSCredential credential) {
+    this.url = Objects.requireNonNull(url);
+
+    pool = new PoolingHttpClientConnectionManager();
+    // Increase max total connection to 100
+    final String maxCnxns =
+        System.getProperty(CACHED_CONNECTIONS_MAX_KEY, CACHED_CONNECTIONS_MAX_DEFAULT);
+    pool.setMaxTotal(Integer.parseInt(maxCnxns));
+    // Increase default max connection per route to 25
+    final String maxCnxnsPerRoute = System.getProperty(CACHED_CONNECTIONS_MAX_PER_ROUTE_KEY,
+        CACHED_CONNECTIONS_MAX_PER_ROUTE_DEFAULT);
+    pool.setDefaultMaxPerRoute(Integer.parseInt(maxCnxnsPerRoute));
+
+    this.host = new HttpHost(url.getHost(), url.getPort());
+
+    this.authRegistry = RegistryBuilder.<AuthSchemeProvider>create().register(AuthSchemes.SPNEGO,
+        new SPNegoSchemeFactory(STRIP_PORT_ON_SERVER_LOOKUP, USE_CANONICAL_HOSTNAME)).build();
+
+    this.credentialsProvider = new BasicCredentialsProvider();
+    if (null != credential) {
+      // Non-null credential should be used directly with KerberosCredentials.
+      this.credentialsProvider.setCredentials(AuthScope.ANY, new KerberosCredentials(credential));
+    } else {
+      // A null credential implies that the user is logged in via JAAS using the
+      // java.security.auth.login.config system property
+      this.credentialsProvider.setCredentials(AuthScope.ANY, EmptyCredentials.INSTANCE);
+    }
+
+    this.authCache = new BasicAuthCache();
+
+    // A single thread-safe HttpClient, pooling connections via the ConnectionManager
+    this.client = HttpClients.custom()
+        .setDefaultAuthSchemeRegistry(authRegistry)
+        .setConnectionManager(pool).build();
+  }
+
+  @Override public byte[] send(byte[] request) {
+    HttpClientContext context = HttpClientContext.create();
+
+    context.setTargetHost(host);
+    context.setCredentialsProvider(credentialsProvider);
+    context.setAuthSchemeRegistry(authRegistry);
+    context.setAuthCache(authCache);
+
+    ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);
+
+    // Create the client with the AuthSchemeRegistry and manager
+    HttpPost post = new HttpPost(toURI(url));
+    post.setEntity(entity);
+
+    try (CloseableHttpResponse response = client.execute(post, context)) {
+      final int statusCode = response.getStatusLine().getStatusCode();
+      if (HttpURLConnection.HTTP_OK == statusCode
+          || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
+        return EntityUtils.toByteArray(response.getEntity());
+      }
+
+      throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.debug("Failed to execute HTTP request", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static URI toURI(URL url) throws RuntimeException {
+    try {
+      return url.toURI();
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A credentials implementation which returns null.
+   */
+  private static class EmptyCredentials implements Credentials {
+    public static final EmptyCredentials INSTANCE = new EmptyCredentials();
+
+    @Override public String getPassword() {
+      return null;
+    }
+    @Override public Principal getUserPrincipal() {
+      return null;
+    }
+  }
+}
+
+// End AvaticaCommonsHttpClientSpnegoImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
index cd6cbce..e93d9da 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
@@ -29,6 +29,8 @@ import java.util.Objects;
 public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory {
   public static final String HTTP_CLIENT_IMPL_DEFAULT =
       AvaticaCommonsHttpClientImpl.class.getName();
+  public static final String SPNEGO_HTTP_CLIENT_IMPL_DEFAULT =
+      AvaticaCommonsHttpClientSpnegoImpl.class.getName();
 
   // Public for Type.PLUGIN
   public static final AvaticaHttpClientFactoryImpl INSTANCE = new AvaticaHttpClientFactoryImpl();
@@ -48,7 +50,12 @@ public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory {
   @Override public AvaticaHttpClient getClient(URL url, ConnectionConfig config) {
     String className = config.httpClientClass();
     if (null == className) {
-      className = HTTP_CLIENT_IMPL_DEFAULT;
+      // Provide an implementation that works with SPNEGO if that's the authentication is use.
+      if ("SPNEGO".equalsIgnoreCase(config.authentication())) {
+        className = SPNEGO_HTTP_CLIENT_IMPL_DEFAULT;
+      } else {
+        className = HTTP_CLIENT_IMPL_DEFAULT;
+      }
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
index b9c57c5..80a5f81 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -63,6 +63,10 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -79,6 +83,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * protobuf requests to POJO requests.
  */
 public class ProtobufTranslationImpl implements ProtobufTranslation {
+  private static final Logger LOG = LoggerFactory.getLogger(ProtobufTranslationImpl.class);
 
   // Extremely ugly mapping of PB class name into a means to convert it to the POJO
   private static final Map<String, RequestTranslator> REQUEST_PARSERS;
@@ -257,7 +262,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
 
     RequestTranslator translator = REQUEST_PARSERS.get(className);
     if (null == translator) {
-      throw new IllegalArgumentException("Cannot find parser for " + className);
+      throw new IllegalArgumentException("Cannot find request parser for " + className);
     }
 
     return translator;
@@ -278,7 +283,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
 
     ResponseTranslator translator = RESPONSE_PARSERS.get(className);
     if (null == translator) {
-      throw new IllegalArgumentException("Cannot find parser for " + className);
+      throw new IllegalArgumentException("Cannot find response parser for " + className);
     }
 
     return translator;
@@ -289,6 +294,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
     UnsynchronizedBuffer out = threadLocalBuffer.get();
     try {
       Message responseMsg = response.serialize();
+      LOG.trace("Serializing response '{}'", TextFormat.shortDebugString(responseMsg));
       serializeMessage(out, responseMsg);
       return out.toArray();
     } finally {
@@ -301,6 +307,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
     UnsynchronizedBuffer out = threadLocalBuffer.get();
     try {
       Message requestMsg = request.serialize();
+      LOG.trace("Serializing request '{}'", TextFormat.shortDebugString(requestMsg));
       serializeMessage(out, requestMsg);
       return out.toArray();
     } finally {
@@ -345,10 +352,16 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
     WireMessage wireMsg = WireMessage.parseFrom(inputStream);
 
     String serializedMessageClassName = wireMsg.getName();
-    RequestTranslator translator = getParserForRequest(serializedMessageClassName);
 
-    // The ByteString should be logical offsets into the original byte array
-    return translator.transform(wireMsg.getWrappedMessage());
+    try {
+      RequestTranslator translator = getParserForRequest(serializedMessageClassName);
+
+      // The ByteString should be logical offsets into the original byte array
+      return translator.transform(wireMsg.getWrappedMessage());
+    } catch (RuntimeException e) {
+      LOG.debug("Failed to parse request message '{}'", TextFormat.shortDebugString(wireMsg));
+      throw e;
+    }
   }
 
   @Override public Response parseResponse(byte[] bytes) throws IOException {
@@ -360,9 +373,14 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
     WireMessage wireMsg = WireMessage.parseFrom(inputStream);
 
     String serializedMessageClassName = wireMsg.getName();
-    ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
+    try {
+      ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
 
-    return translator.transform(wireMsg.getWrappedMessage());
+      return translator.transform(wireMsg.getWrappedMessage());
+    } catch (RuntimeException e) {
+      LOG.debug("Failed to parse response message '{}'", TextFormat.shortDebugString(wireMsg));
+      throw e;
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
index 828513a..2e92662 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
@@ -16,13 +16,19 @@
  */
 package org.apache.calcite.avatica.remote;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 
+
 /**
  * ProtobufService implementation that queries against a remote implementation, using
  * protocol buffers as the serialized form.
  */
 public class RemoteProtobufService extends ProtobufService {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteProtobufService.class);
+
   private final AvaticaHttpClient client;
   private final ProtobufTranslation translation;
 
@@ -33,10 +39,19 @@ public class RemoteProtobufService extends ProtobufService {
 
   @Override public Response _apply(Request request) {
     final Response resp;
+    byte[] response = null;
+    try {
+      response = client.send(translation.serializeRequest(request));
+    } catch (IOException e) {
+      LOG.debug("Failed to execute remote request: {}", request);
+      // Failed to get a response from the server for the request.
+      throw new RuntimeException(e);
+    }
+
     try {
-      byte[] response = client.send(translation.serializeRequest(request));
       resp = translation.parseResponse(response);
     } catch (IOException e) {
+      LOG.debug("Failed to deserialize reponse to {}. '{}'", request, new String(response));
       // Not a protobuf that we could parse.
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java
index 078e63e..2ead933 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -2216,8 +2216,10 @@ public interface Service {
 
     public static final int UNKNOWN_ERROR_CODE = -1;
     public static final int MISSING_CONNECTION_ERROR_CODE = 1;
+    public static final int UNAUTHORIZED_ERROR_CODE = 2;
 
     public static final String UNKNOWN_SQL_STATE = "00000";
+    public static final String UNAUTHORIZED_SQL_STATE = "00002";
 
     public final List<String> exceptions;
     public final String errorMessage;
@@ -2318,7 +2320,8 @@ public interface Service {
           metadata);
     }
 
-    @Override Responses.ErrorResponse serialize() {
+    // Public so the Jetty handler implementations can use it
+    @Override public Responses.ErrorResponse serialize() {
       Responses.ErrorResponse.Builder builder = Responses.ErrorResponse.newBuilder();
 
       if (null != rpcMetadata) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 938b3a5..c97cdb3 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -77,6 +77,7 @@ limitations under the License.
     <jcip-annotations.version>1.0-1</jcip-annotations.version>
     <jetty.version>9.2.15.v20160210</jetty.version>
     <junit.version>4.12</junit.version>
+    <kerby.version>1.0.0-RC1</kerby.version>
     <maven-checkstyle-plugin.version>2.12.1</maven-checkstyle-plugin.version>
     <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
 
@@ -204,6 +205,21 @@ limitations under the License.
         <version>${scott-data-hsqldb.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.kerby</groupId>
+        <artifactId>kerb-client</artifactId>
+        <version>${kerby.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kerby</groupId>
+        <artifactId>kerb-core</artifactId>
+        <version>${kerby.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kerby</groupId>
+        <artifactId>kerb-simplekdc</artifactId>
+        <version>${kerby.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
         <version>${commons-lang3.version}</version>
@@ -240,6 +256,11 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-security</artifactId>
+        <version>${jetty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-server</artifactId>
         <version>${jetty.version}</version>
       </dependency>
@@ -534,9 +555,6 @@ limitations under the License.
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
           <configuration>
-            <threadCount>1</threadCount>
-            <perCoreThreadCount>true</perCoreThreadCount>
-            <parallel>both</parallel>
             <argLine>-Xmx1536m -XX:MaxPermSize=256m -Duser.timezone=${user.timezone}</argLine>
           </configuration>
         </plugin>

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/server/pom.xml b/avatica/server/pom.xml
index 9bb841e..d24aaf2 100644
--- a/avatica/server/pom.xml
+++ b/avatica/server/pom.xml
@@ -53,6 +53,10 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-security</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
     </dependency>
     <dependency>
@@ -79,6 +83,21 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerb-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerb-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerb-simplekdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
@@ -157,7 +176,6 @@ limitations under the License.
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
-        <version>${maven-dependency-plugin.version}</version>
         <!-- configurations do not cascade, so all of the definition from
              ../pom.xml:build:plugin-management:plugins:plugin must be repeated in child poms -->
         <executions>

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
new file mode 100644
index 0000000..233b36a
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Base-class for Avatica implemented Jetty Handlers.
+ */
+public abstract class AbstractAvaticaHandler extends AbstractHandler
+    implements MetricsAwareAvaticaHandler {
+
+  private static final ErrorResponse UNAUTHORIZED_ERROR = new ErrorResponse(
+      Collections.<String>emptyList(), "User is not authenticated",
+      ErrorResponse.UNAUTHORIZED_ERROR_CODE, ErrorResponse.UNAUTHORIZED_SQL_STATE,
+      AvaticaSeverity.ERROR, null);
+
+  /**
+   * Determines if a request is permitted to be executed. The server may require authentication
+   * and the login mechanism might have failed. This check verifies that only authenticated
+   * users are permitted through when the server is requiring authentication. When a user
+   * is disallowed, a status code and response will be automatically written to the provided
+   * <code>response</code> and the caller should return immediately.
+   *
+   * @param serverConfig The server's configuration
+   * @param request The user's request
+   * @param response The response to the user's request
+   * @return True if request can proceed, false otherwise.
+   */
+  public boolean isUserPermitted(AvaticaServerConfiguration serverConfig,
+      HttpServletRequest request, HttpServletResponse response) throws IOException {
+    // Make sure that we drop any unauthenticated users out first.
+    if (null != serverConfig && AuthenticationType.SPNEGO == serverConfig.getAuthenticationType()) {
+      String remoteUser = request.getRemoteUser();
+      if (null == remoteUser) {
+        response.setStatus(HttpURLConnection.HTTP_UNAUTHORIZED);
+        response.getOutputStream().write(UNAUTHORIZED_ERROR.serialize().toByteArray());
+        return false;
+      }
+    }
+
+    return true;
+  }
+}
+
+// End AbstractAvaticaHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/AuthenticationType.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AuthenticationType.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AuthenticationType.java
new file mode 100644
index 0000000..936affe
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AuthenticationType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+/**
+ * An enumeration for support types of authentication for the {@link HttpServer}.
+ */
+public enum AuthenticationType {
+  NONE,
+  SPNEGO;
+}
+
+// End AuthenticationType.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
index 34a9333..f5c939c 100644
--- a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
@@ -28,25 +28,24 @@ import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
 import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
 
 import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
-
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 
 import javax.servlet.ServletException;
 import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
 /**
  * Jetty handler that executes Avatica JSON request-responses.
  */
-public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+public class AvaticaJsonHandler extends AbstractAvaticaHandler {
   private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
 
   final Service service;
@@ -57,11 +56,18 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
 
   final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
 
+  final AvaticaServerConfiguration serverConfig;
+
   public AvaticaJsonHandler(Service service) {
-    this(service, NoopMetricsSystem.getInstance());
+    this(service, NoopMetricsSystem.getInstance(), null);
   }
 
   public AvaticaJsonHandler(Service service, MetricsSystem metrics) {
+    this(service, metrics, null);
+  }
+
+  public AvaticaJsonHandler(Service service, MetricsSystem metrics,
+      AvaticaServerConfiguration serverConfig) {
     this.service = Objects.requireNonNull(service);
     this.metrics = Objects.requireNonNull(metrics);
     // Avatica doesn't have a Guava dependency
@@ -76,12 +82,20 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
         return new UnsynchronizedBuffer();
       }
     };
+
+    this.serverConfig = serverConfig;
   }
 
   public void handle(String target, Request baseRequest,
       HttpServletRequest request, HttpServletResponse response)
       throws IOException, ServletException {
     try (final Context ctx = requestTimer.start()) {
+      if (!isUserPermitted(serverConfig, request, response)) {
+        LOG.debug("HTTP request from {} is unauthenticated and authentication is required",
+            request.getRemoteAddr());
+        return;
+      }
+
       response.setContentType("application/json;charset=utf-8");
       response.setStatus(HttpServletResponse.SC_OK);
       if (request.getMethod().equals("POST")) {
@@ -102,7 +116,21 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
             new String(rawRequest.getBytes("ISO-8859-1"), "UTF-8");
         LOG.trace("request: {}", jsonRequest);
 
-        final HandlerResponse<String> jsonResponse = jsonHandler.apply(jsonRequest);
+        final HandlerResponse<String> jsonResponse;
+        if (null != serverConfig && serverConfig.supportsImpersonation()) {
+          try {
+            jsonResponse = serverConfig.doAsRemoteUser(request.getRemoteUser(),
+                request.getRemoteAddr(), new Callable<HandlerResponse<String>>() {
+                  @Override public HandlerResponse<String> call() {
+                    return jsonHandler.apply(jsonRequest);
+                  }
+                });
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        } else {
+          jsonResponse = jsonHandler.apply(jsonRequest);
+        }
         LOG.trace("response: {}", jsonResponse);
         baseRequest.setHandled(true);
         // Set the status code and write out the response.

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
index 27e73de..a465ba1 100644
--- a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -31,13 +31,12 @@ import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
 import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
 
 import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 
 import javax.servlet.ServletException;
 import javax.servlet.ServletInputStream;
@@ -47,7 +46,7 @@ import javax.servlet.http.HttpServletResponse;
 /**
  * Jetty handler that executes Avatica JSON request-responses.
  */
-public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+public class AvaticaProtobufHandler extends AbstractAvaticaHandler {
   private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
 
   private final Service service;
@@ -55,6 +54,7 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
   private final ProtobufTranslation protobufTranslation;
   private final MetricsSystem metrics;
   private final Timer requestTimer;
+  private final AvaticaServerConfiguration serverConfig;
 
   final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
 
@@ -63,6 +63,11 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
   }
 
   public AvaticaProtobufHandler(Service service, MetricsSystem metrics) {
+    this(service, metrics, null);
+  }
+
+  public AvaticaProtobufHandler(Service service, MetricsSystem metrics,
+      AvaticaServerConfiguration serverConfig) {
     this.service = Objects.requireNonNull(service);
     this.metrics = Objects.requireNonNull(metrics);
 
@@ -78,16 +83,25 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
         return new UnsynchronizedBuffer();
       }
     };
+
+    this.serverConfig = serverConfig;
   }
 
   public void handle(String target, Request baseRequest,
       HttpServletRequest request, HttpServletResponse response)
       throws IOException, ServletException {
     try (final Context ctx = this.requestTimer.start()) {
+      // Check if the user is OK to proceed.
+      if (!isUserPermitted(serverConfig, request, response)) {
+        LOG.debug("HTTP request from {} is unauthenticated and authentication is required",
+            request.getRemoteAddr());
+        return;
+      }
+
       response.setContentType("application/octet-stream;charset=utf-8");
       response.setStatus(HttpServletResponse.SC_OK);
       if (request.getMethod().equals("POST")) {
-        byte[] requestBytes;
+        final byte[] requestBytes;
         // Avoid a new buffer creation for every HTTP request
         final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
         try (ServletInputStream inputStream = request.getInputStream()) {
@@ -96,7 +110,22 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
           buffer.reset();
         }
 
-        HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes);
+        final HandlerResponse<byte[]> handlerResponse;
+        if (null != serverConfig && serverConfig.supportsImpersonation()) {
+          // Invoke the ProtobufHandler inside as doAs for the remote user.
+          try {
+            handlerResponse = serverConfig.doAsRemoteUser(request.getRemoteUser(),
+              request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() {
+                @Override public HandlerResponse<byte[]> call() {
+                  return pbHandler.apply(requestBytes);
+                }
+              });
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        } else {
+          handlerResponse = pbHandler.apply(requestBytes);
+        }
 
         baseRequest.setHandled(true);
         response.setStatus(handlerResponse.getStatusCode());

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaServerConfiguration.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaServerConfiguration.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaServerConfiguration.java
new file mode 100644
index 0000000..95eee51
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaServerConfiguration.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A generic configuration interface that users can implement to configure the {@link HttpServer}.
+ */
+public interface AvaticaServerConfiguration {
+
+  /**
+   * Returns the type of authentication the {@link HttpServer} should use.
+   * @return An enum corresponding to an authentication mechanism
+   */
+  AuthenticationType getAuthenticationType();
+
+  /**
+   * Returns the Kerberos realm to use for the server's login. Only relevant when
+   * {@link #getAuthenticationType()} returns {@link AuthenticationType#SPNEGO}.
+   *
+   * @return The Kerberos realm for the server login, or null if not applicable.
+   */
+  String getKerberosRealm();
+
+  /**
+   * Returns the Kerberos principal that the Avatica server should log in as.
+   *
+   * @return A Kerberos principal, or null if not applicable.
+   */
+  String getKerberosPrincipal();
+
+  /**
+   * Returns true if the Avatica server should run user requests at that remote user. Otherwise,
+   * all requests are run as the Avatica server user (which is the default).
+   *
+   * @return True if impersonation is enabled, false otherwise.
+   */
+  boolean supportsImpersonation();
+
+  /**
+   * Invokes the given <code>action</code> as the <code>remoteUserName</code>. This will only be
+   * invoked if {@link #supportsImpersonation()} returns <code>true</code>.
+   *
+   * @param remoteUserName The remote user making a request to the Avatica server.
+   * @param remoteAddress The address the remote user is making the request from.
+   * @return The result from the Callable.
+   */
+  <T> T doAsRemoteUser(String remoteUserName, String remoteAddress, Callable<T> action)
+      throws Exception;
+}
+
+// End AvaticaServerConfiguration.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/DoAsRemoteUserCallback.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/DoAsRemoteUserCallback.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DoAsRemoteUserCallback.java
new file mode 100644
index 0000000..c2be80f
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DoAsRemoteUserCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A callback which the server can invoke to allow implementations to control additional logic
+ * about whether or not a client from a specific host should be run.
+ * <p>For example, complex logic could be supplied to control whether clientA from host1 is
+ * permitted to execute queries. This logic is deferred upon the user to implement.
+ */
+public interface DoAsRemoteUserCallback {
+
+  /**
+   * Invokes the given <code>action</code> as the <code>remoteUserName</code>.
+   *
+   * @param remoteUserName The remote user making a request to the Avatica server.
+   * @param remoteAddress The address the remote user is making the request from.
+   * @param action The operation the Avatica server wants to run as <code>remoteUserName</code>.
+   * @return The result from the Callable.
+   */
+  <T> T doAsRemoteUser(String remoteUserName, String remoteAddress, Callable<T> action)
+      throws Exception;
+
+}
+
+// End DoAsRemoteUserCallback.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
index b1fcb40..6c70d1b 100644
--- a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
@@ -25,13 +25,11 @@ import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystemConfiguration;
 import org.apache.calcite.avatica.remote.Driver;
 import org.apache.calcite.avatica.remote.Service;
 
-import org.eclipse.jetty.server.Handler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.ServiceLoader;
 
 /**
@@ -46,29 +44,62 @@ public class HandlerFactory {
    *
    * @param service The underlying {@link Service}.
    * @param serialization The desired message serialization.
-   * @return The {@link Handler}.
+   * @return The {@link AvaticaHandler}.
    */
-  public Handler getHandler(Service service, Driver.Serialization serialization) {
+  public AvaticaHandler getHandler(Service service, Driver.Serialization serialization) {
     return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance());
   }
 
   /**
+   * Constructs the desired implementation for the given serialization method and server
+   * configuration with metrics.
+   *
+   * @param service The underlying {@link Service}.
+   * @param serialization The desired message serialization.
+   * @param serverConfig Avatica server configuration or null.
+   * @return The {@link AvaticaHandler}.
+   */
+  public AvaticaHandler getHandler(Service service, Driver.Serialization serialization,
+      AvaticaServerConfiguration serverConfig) {
+    return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance(),
+        serverConfig);
+  }
+
+  /**
    * Constructs the desired implementation for the given serialization method with metrics.
    *
    * @param service The underlying {@link Service}.
    * @param serialization The desired message serialization.
    * @param metricsConfig Configuration for the {@link MetricsSystem}.
-   * @return The {@link Handler}.
+   * @return The {@link AvaticaHandler}.
    */
-  public Handler getHandler(Service service, Driver.Serialization serialization,
+  public AvaticaHandler getHandler(Service service, Driver.Serialization serialization,
       MetricsSystemConfiguration<?> metricsConfig) {
-    MetricsSystem metrics = MetricsSystemLoader.load(Objects.requireNonNull(metricsConfig));
+    return getHandler(service, serialization, metricsConfig, null);
+  }
+
+  /**
+   * Constructs the desired implementation for the given serialization method and server
+   * configuration with metrics.
+   *
+   * @param service The underlying {@link Service}
+   * @param serialization The serializatio mechanism to use
+   * @param metricsConfig Configuration for the {@link MetricsSystem}.
+   * @param serverConfig Avatica server configuration or null
+   * @return An {@link AvaticaHandler}
+   */
+  public AvaticaHandler getHandler(Service service, Driver.Serialization serialization,
+      MetricsSystemConfiguration<?> metricsConfig, AvaticaServerConfiguration serverConfig) {
+    if (null == metricsConfig) {
+      metricsConfig = NoopMetricsSystemConfiguration.getInstance();
+    }
+    MetricsSystem metrics = MetricsSystemLoader.load(metricsConfig);
 
     switch (serialization) {
     case JSON:
-      return new AvaticaJsonHandler(service, metrics);
+      return new AvaticaJsonHandler(service, metrics, serverConfig);
     case PROTOBUF:
-      return new AvaticaProtobufHandler(service, metrics);
+      return new AvaticaProtobufHandler(service, metrics, serverConfig);
     default:
       throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name());
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
index c81e899..f99e221 100644
--- a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
@@ -16,21 +16,42 @@
  */
 package org.apache.calcite.avatica.server;
 
+import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration;
+import org.apache.calcite.avatica.remote.Driver.Serialization;
+import org.apache.calcite.avatica.remote.Service;
 import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
 
+import org.eclipse.jetty.security.ConstraintMapping;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.authentication.SpnegoAuthenticator;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.util.security.Constraint;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
 
 /**
  * Avatica HTTP server.
@@ -44,12 +65,18 @@ public class HttpServer {
   private Server server;
   private int port = -1;
   private final AvaticaHandler handler;
+  private final AvaticaServerConfiguration config;
+  private final Subject subject;
 
   @Deprecated
   public HttpServer(Handler handler) {
     this(wrapJettyHandler(handler));
   }
 
+  /**
+   * Constructs an {@link HttpServer} which binds to an ephemeral port.
+   * @param handler The Handler to run
+   */
   public HttpServer(AvaticaHandler handler) {
     this(0, handler);
   }
@@ -59,9 +86,38 @@ public class HttpServer {
     this(port, wrapJettyHandler(handler));
   }
 
+  /**
+   * Constructs an {@link HttpServer} with no additional configuration.
+   * @param port The listen port
+   * @param handler The Handler to run
+   */
   public HttpServer(int port, AvaticaHandler handler) {
+    this(port, handler, null);
+  }
+
+  /**
+   * Constructs an {@link HttpServer}.
+   * @param port The listen port
+   * @param handler The Handler to run
+   * @param config Optional configuration for the server
+   */
+  public HttpServer(int port, AvaticaHandler handler, AvaticaServerConfiguration config) {
+    this(port, handler, config, null);
+  }
+
+  /**
+   * Constructs an {@link HttpServer}.
+   * @param port The listen port
+   * @param handler The Handler to run
+   * @param config Optional configuration for the server
+   * @param subject The javax.security Subject for the server, or null
+   */
+  public HttpServer(int port, AvaticaHandler handler, AvaticaServerConfiguration config,
+      Subject subject) {
     this.port = port;
     this.handler = handler;
+    this.config = config;
+    this.subject = subject;
   }
 
   private static AvaticaHandler wrapJettyHandler(Handler handler) {
@@ -73,6 +129,20 @@ public class HttpServer {
   }
 
   public void start() {
+    if (null != subject) {
+      // Run the start in the privileged block (as the kerberos-identified user)
+      Subject.doAs(subject, new PrivilegedAction<Void>() {
+        @Override public Void run() {
+          internalStart();
+          return null;
+        }
+      });
+    } else {
+      internalStart();
+    }
+  }
+
+  protected void internalStart() {
     if (server != null) {
       throw new RuntimeException("Server is already started");
     }
@@ -83,11 +153,34 @@ public class HttpServer {
     server.manage(threadPool);
 
     final ServerConnector connector = configureConnector(new ServerConnector(server), port);
+    ConstraintSecurityHandler spnegoHandler = null;
+
+    if (null != this.config) {
+      switch (config.getAuthenticationType()) {
+      case SPNEGO:
+        // Get the Handler for SPNEGO authentication
+        spnegoHandler = configureSpnego(server, connector, this.config);
+        break;
+      default:
+        // Pass
+        break;
+      }
+    }
 
     server.setConnectors(new Connector[] { connector });
 
+    // Default to using the handler that was passed in
     final HandlerList handlerList = new HandlerList();
-    handlerList.setHandlers(new Handler[] { handler, new DefaultHandler() });
+    Handler avaticaHandler = handler;
+
+    // Wrap the provided handler for SPNEGO if we made one
+    if (null != spnegoHandler) {
+      spnegoHandler.setHandler(handler);
+      avaticaHandler = spnegoHandler;
+    }
+
+    handlerList.setHandlers(new Handler[] {avaticaHandler, new DefaultHandler()});
+
     server.setHandler(handlerList);
     try {
       server.start();
@@ -122,6 +215,41 @@ public class HttpServer {
   }
 
   /**
+   * Configures the <code>connector</code> given the <code>config</code> for using SPNEGO.
+   *
+   * @param connector The connector to configure
+   * @param config The configuration
+   */
+  protected ConstraintSecurityHandler configureSpnego(Server server, ServerConnector connector,
+      AvaticaServerConfiguration config) {
+    final String realm = Objects.requireNonNull(config.getKerberosRealm());
+    final String principal = Objects.requireNonNull(config.getKerberosPrincipal());
+
+    Constraint constraint = new Constraint();
+    constraint.setName(Constraint.__SPNEGO_AUTH);
+    constraint.setRoles(new String[]{realm});
+    // This is telling Jetty to not allow unauthenticated requests through (very important!)
+    constraint.setAuthenticate(true);
+
+    ConstraintMapping cm = new ConstraintMapping();
+    cm.setConstraint(constraint);
+    cm.setPathSpec("/*");
+
+    // A customization of SpnegoLoginService to explicitly set the server's principal, otherwise
+    // we would have to require a custom file to set the server's principal.
+    PropertyBasedSpnegoLoginService spnegoLoginService =
+        new PropertyBasedSpnegoLoginService(realm, principal);
+
+    ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
+    sh.setAuthenticator(new SpnegoAuthenticator());
+    sh.setLoginService(spnegoLoginService);
+    sh.setConstraintMappings(new ConstraintMapping[]{cm});
+    sh.setRealmName(realm);
+
+    return sh;
+  }
+
+  /**
    * Configures the server connector.
    *
    * <p>The default configuration sets a timeout of 1 minute and disables
@@ -163,6 +291,263 @@ public class HttpServer {
   public int getPort() {
     return port;
   }
+
+  /**
+   * Builder class for creating instances of {@link HttpServer}.
+   */
+  public static class Builder {
+    private int port;
+
+    private Service service;
+    private Serialization serialization;
+    private AvaticaHandler handler = null;
+
+    private MetricsSystemConfiguration<?> metricsConfig;
+
+    private AuthenticationType authenticationType = AuthenticationType.NONE;
+
+    private String kerberosPrincipal;
+    private String kerberosRealm;
+    private File keytab;
+
+    private DoAsRemoteUserCallback remoteUserCallback;
+
+    public Builder() {}
+
+    public Builder withPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    /**
+     * Sets the {@link Service} and {@link Serialization} information necessary to construct
+     * the appropriate {@link AvaticaHandler}.
+     *
+     * @param service The Avatica service
+     * @param serialization The serialization method
+     * @return <code>this</code>
+     */
+    public Builder withHandler(Service service, Serialization serialization) {
+      this.service = Objects.requireNonNull(service);
+      this.serialization = Objects.requireNonNull(serialization);
+      return this;
+    }
+
+    /**
+     * Sets an {@link AvaticaHandler} directly on the builder. Most users will not want to use
+     * this method and should instead use {@link #withHandler(Service, Serialization)}.
+     *
+     * @param handler The handler
+     * @return <code>this</code>
+     */
+    public Builder withHandler(AvaticaHandler handler) {
+      this.handler = Objects.requireNonNull(handler);
+      return this;
+    }
+
+    /**
+     * Sets the given configuration to enable metrics collection in the server.
+     *
+     * @param metricsConfig Configuration object for metrics.
+     * @return <code>this</code>
+     */
+    public Builder withMetricsConfiguration(MetricsSystemConfiguration<?> metricsConfig) {
+      this.metricsConfig = Objects.requireNonNull(metricsConfig);
+      return this;
+    }
+
+    /**
+     * Configures the server to use SPNEGO authentication. This method requires that the
+     * <code>principal</code> contains the Kerberos realm.
+     *
+     * @param principal A kerberos principal with the realm required.
+     * @return <code>this</code>
+     */
+    public Builder withSpnego(String principal) {
+      int index = Objects.requireNonNull(principal).lastIndexOf('@');
+      if (-1 == index) {
+        throw new IllegalArgumentException("Could not find '@' symbol in '" + principal
+            + "' to parse the Kerberos realm from the principal");
+      }
+      final String realm = principal.substring(index + 1);
+      return withSpnego(principal, realm);
+    }
+
+    /**
+     * Configures the server to use SPNEGO authentication. It is required that callers are logged
+     * in via Kerberos already or have provided the necessary configuration to automatically log
+     * in via JAAS (using the <code>java.security.auth.login.config</code> system property) before
+     * starting the {@link HttpServer}.
+     *
+     * @param principal The kerberos principal
+     * @param realm The kerberos realm
+     * @return <code>this</code>
+     */
+    public Builder withSpnego(String principal, String realm) {
+      this.authenticationType = AuthenticationType.SPNEGO;
+      this.kerberosPrincipal = Objects.requireNonNull(principal);
+      this.kerberosRealm = Objects.requireNonNull(realm);
+      return this;
+    }
+
+    /**
+     * Sets a keytab to be used to perform a Kerberos login automatically (without the use of JAAS).
+     *
+     * @param keytab A KeyTab file for the server's login.
+     * @return <code>this</code>
+     */
+    public Builder withAutomaticLogin(File keytab) {
+      this.keytab = Objects.requireNonNull(keytab);
+      return this;
+    }
+
+    /**
+     * Sets a callback implementation to defer the logic on how to run an action as a given user and
+     * if the action should be permitted for that user.
+     *
+     * @param remoteUserCallback User-provided implementation of the callback
+     * @return <code>this</code>
+     */
+    public Builder withImpersonation(DoAsRemoteUserCallback remoteUserCallback) {
+      this.remoteUserCallback = Objects.requireNonNull(remoteUserCallback);
+      return this;
+    }
+
+    /**
+     * Builds the HttpServer instance from <code>this</code>.
+     * @return An HttpServer.
+     */
+    public HttpServer build() {
+      final AvaticaServerConfiguration serverConfig;
+      final Subject subject;
+      switch (authenticationType) {
+      case NONE:
+        serverConfig = null;
+        subject = null;
+        break;
+      case SPNEGO:
+        if (null != keytab) {
+          LOG.debug("Performing Kerberos login with {} as {}", keytab, kerberosPrincipal);
+          subject = loginViaKerberos(this);
+        } else {
+          LOG.debug("Not performing Kerberos login");
+          subject = null;
+        }
+        serverConfig = buildSpnegoConfiguration(this);
+        break;
+      default:
+        throw new IllegalArgumentException("Unhandled AuthenticationType");
+      }
+
+      AvaticaHandler handler = buildHandler(this, serverConfig);
+
+      return new HttpServer(port, handler, serverConfig, subject);
+    }
+
+    /**
+     * Creates the appropriate {@link AvaticaHandler}.
+     *
+     * @param b The {@link Builder}.
+     * @param config The Avatica server configuration
+     * @return An {@link AvaticaHandler}.
+     */
+    private AvaticaHandler buildHandler(Builder b, AvaticaServerConfiguration config) {
+      // The user provided a handler explicitly.
+      if (null != b.handler) {
+        return b.handler;
+      }
+
+      // Normal case, we create the handler for the user.
+      HandlerFactory factory = new HandlerFactory();
+      return factory.getHandler(b.service, b.serialization, b.metricsConfig, config);
+    }
+
+    /**
+     * Builds an {@link AvaticaServerConfiguration} implementation for SPNEGO-based authentication.
+     * @param b The {@link Builder}.
+     * @return A configuration instance.
+     */
+    private AvaticaServerConfiguration buildSpnegoConfiguration(Builder b) {
+      final String principal = b.kerberosPrincipal;
+      final String realm = b.kerberosRealm;
+      final DoAsRemoteUserCallback callback = b.remoteUserCallback;
+      return new AvaticaServerConfiguration() {
+
+        @Override public AuthenticationType getAuthenticationType() {
+          return AuthenticationType.SPNEGO;
+        }
+
+        @Override public String getKerberosRealm() {
+          return realm;
+        }
+
+        @Override public String getKerberosPrincipal() {
+          return principal;
+        }
+
+        @Override public boolean supportsImpersonation() {
+          return null != callback;
+        }
+
+        @Override public <T> T doAsRemoteUser(String remoteUserName, String remoteAddress,
+            Callable<T> action) throws Exception {
+          return callback.doAsRemoteUser(remoteUserName, remoteAddress, action);
+        }
+      };
+    }
+
+    private Subject loginViaKerberos(Builder b) {
+      Set<Principal> principals = new HashSet<Principal>();
+      principals.add(new KerberosPrincipal(b.kerberosPrincipal));
+
+      Subject subject = new Subject(false, principals, new HashSet<Object>(),
+          new HashSet<Object>());
+
+      KeytabJaasConf conf = new KeytabJaasConf(b.kerberosPrincipal, b.keytab.toString());
+      String confName = "NotUsed";
+      try {
+        LoginContext loginContext = new LoginContext(confName, subject, null, conf);
+        loginContext.login();
+        return loginContext.getSubject();
+      } catch (LoginException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * Javax Configuration class which always returns a configuration for our keytab-based
+     * login.
+     */
+    private static class KeytabJaasConf extends javax.security.auth.login.Configuration {
+      private final String principal;
+      private final String keytab;
+
+      private KeytabJaasConf(String principal, String keytab) {
+        this.principal = principal;
+        this.keytab = keytab;
+      }
+
+      @Override public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        Map<String, String> options = new HashMap<String, String>();
+        options.put("storeKey", "true");
+        options.put("principal", principal);
+        options.put("keyTab", keytab);
+        options.put("doNotPrompt", "true");
+        options.put("useKeyTab", "true");
+        options.put("isInitiator", "false");
+        options.put("debug", System.getProperty("sun.security.krb5.debug", "false").toLowerCase());
+
+        return new AppConfigurationEntry[] {new AppConfigurationEntry(getKrb5LoginModuleName(),
+            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options)};
+      }
+    }
+
+    private static String getKrb5LoginModuleName() {
+      return System.getProperty("java.vendor").contains("IBM")
+          ? "com.ibm.security.auth.module.Krb5LoginModule"
+          : "com.sun.security.auth.module.Krb5LoginModule";
+    }
+  }
 }
 
 // End HttpServer.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/main/java/org/apache/calcite/avatica/server/PropertyBasedSpnegoLoginService.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/PropertyBasedSpnegoLoginService.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/PropertyBasedSpnegoLoginService.java
new file mode 100644
index 0000000..9e373fb
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/PropertyBasedSpnegoLoginService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.eclipse.jetty.security.SpnegoLoginService;
+
+import java.lang.reflect.Field;
+import java.util.Objects;
+
+/**
+ * A customization of {@link SpnegoLoginService} which directly specifies the server's
+ * principal instead of requiring a file to exist. Known to work with Jetty-9.2.x, any other
+ * version would require testing/inspection to ensure the logic is still sound.
+ */
+public class PropertyBasedSpnegoLoginService extends SpnegoLoginService {
+
+  private static final String TARGET_NAME_FIELD_NAME = "_targetName";
+  private final String serverPrincipal;
+
+  public PropertyBasedSpnegoLoginService(String realm, String serverPrincipal) {
+    super(realm);
+    this.serverPrincipal = Objects.requireNonNull(serverPrincipal);
+  }
+
+  @Override protected void doStart() throws Exception {
+    // Override the parent implementation, setting _targetName to be the serverPrincipal
+    // without the need for a one-line file to do the same thing.
+    //
+    // AbstractLifeCycle's doStart() method does nothing, so we aren't missing any extra logic.
+    final Field targetNameField = SpnegoLoginService.class.getDeclaredField(TARGET_NAME_FIELD_NAME);
+    targetNameField.setAccessible(true);
+    targetNameField.set(this, serverPrincipal);
+  }
+}
+
+// End PropertyBasedSpnegoLoginService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/406372f1/avatica/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java b/avatica/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
new file mode 100644
index 0000000..39de6fe
--- /dev/null
+++ b/avatica/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.remote.Driver;
+import org.apache.calcite.avatica.remote.LocalService;
+import org.apache.calcite.avatica.server.HttpServer;
+
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
+import org.apache.kerby.kerberos.kerb.client.KrbConfig;
+import org.apache.kerby.kerberos.kerb.client.KrbConfigKey;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import javax.security.auth.Subject;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * End to end test case for SPNEGO with Avatica.
+ */
+@RunWith(Parameterized.class)
+public class AvaticaSpnegoTest {
+  private static final Logger LOG = LoggerFactory.getLogger(AvaticaSpnegoTest.class);
+
+  private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB;
+
+  private static SimpleKdcServer kdc;
+  private static KrbConfig clientConfig;
+  private static File keytabDir;
+
+  private static int kdcPort;
+  private static File clientKeytab;
+  private static File serverKeytab;
+
+  private static boolean isKdcStarted = false;
+
+  private static void setupKdc() throws Exception {
+    kdc = new SimpleKdcServer();
+    File target = new File(System.getProperty("user.dir"), "target");
+    assertTrue(target.exists());
+
+    File kdcDir = new File(target, AvaticaSpnegoTest.class.getSimpleName());
+    if (kdcDir.exists()) {
+      SpnegoTestUtil.deleteRecursively(kdcDir);
+    }
+    kdcDir.mkdirs();
+    kdc.setWorkDir(kdcDir);
+
+    kdc.setKdcHost(SpnegoTestUtil.KDC_HOST);
+    kdcPort = SpnegoTestUtil.getFreePort();
+    kdc.setAllowTcp(true);
+    kdc.setAllowUdp(false);
+    kdc.setKdcTcpPort(kdcPort);
+
+    LOG.info("Starting KDC server at {}:{}", SpnegoTestUtil.KDC_HOST, kdcPort);
+
+    kdc.init();
+    kdc.start();
+    isKdcStarted = true;
+
+    keytabDir = new File(target, AvaticaSpnegoTest.class.getSimpleName()
+        + "_keytabs");
+    if (keytabDir.exists()) {
+      SpnegoTestUtil.deleteRecursively(keytabDir);
+    }
+    keytabDir.mkdirs();
+    setupServerUser(keytabDir);
+
+    clientConfig = new KrbConfig();
+    clientConfig.setString(KrbConfigKey.KDC_HOST, SpnegoTestUtil.KDC_HOST);
+    clientConfig.setInt(KrbConfigKey.KDC_TCP_PORT, kdcPort);
+    clientConfig.setString(KrbConfigKey.DEFAULT_REALM, SpnegoTestUtil.REALM);
+
+    // Kerby sets "java.security.krb5.conf" for us!
+    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+    // System.setProperty("sun.security.spnego.debug", "true");
+    // System.setProperty("sun.security.krb5.debug", "true");
+  }
+
+  @AfterClass public static void stopKdc() throws Exception {
+    if (isKdcStarted) {
+      LOG.info("Stopping KDC on {}", kdcPort);
+      kdc.stop();
+    }
+  }
+
+  private static void setupServerUser(File keytabDir) throws KrbException {
+    // Create the client user
+    String clientPrincipal = SpnegoTestUtil.CLIENT_PRINCIPAL.substring(0,
+        SpnegoTestUtil.CLIENT_PRINCIPAL.indexOf('@'));
+    clientKeytab = new File(keytabDir, clientPrincipal.replace('/', '_') + ".keytab");
+    if (clientKeytab.exists()) {
+      SpnegoTestUtil.deleteRecursively(clientKeytab);
+    }
+    LOG.info("Creating {} with keytab {}", clientPrincipal, clientKeytab);
+    SpnegoTestUtil.setupUser(kdc, clientKeytab, clientPrincipal);
+
+    // Create the server user
+    String serverPrincipal = SpnegoTestUtil.SERVER_PRINCIPAL.substring(0,
+        SpnegoTestUtil.SERVER_PRINCIPAL.indexOf('@'));
+    serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
+    if (serverKeytab.exists()) {
+      SpnegoTestUtil.deleteRecursively(serverKeytab);
+    }
+    LOG.info("Creating {} with keytab {}", SpnegoTestUtil.SERVER_PRINCIPAL, serverKeytab);
+    SpnegoTestUtil.setupUser(kdc, serverKeytab, SpnegoTestUtil.SERVER_PRINCIPAL);
+  }
+
+  @Parameters public static List<Object[]> parameters() throws Exception {
+    final ArrayList<Object[]> parameters = new ArrayList<>();
+
+    // Start the KDC
+    setupKdc();
+
+    // Create a LocalService around HSQLDB
+    final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
+        CONNECTION_SPEC.username, CONNECTION_SPEC.password);
+    final LocalService localService = new LocalService(jdbcMeta);
+
+    for (Driver.Serialization serialization : new Driver.Serialization[] {
+      Driver.Serialization.JSON, Driver.Serialization.PROTOBUF}) {
+      // Build and start the server
+      HttpServer httpServer = new HttpServer.Builder()
+          .withPort(0)
+          .withAutomaticLogin(serverKeytab)
+          .withSpnego(SpnegoTestUtil.SERVER_PRINCIPAL, SpnegoTestUtil.REALM)
+          .withHandler(localService, serialization)
+          .build();
+      httpServer.start();
+
+      final String url = "jdbc:avatica:remote:url=http://" + SpnegoTestUtil.KDC_HOST + ":"
+          + httpServer.getPort() + ";authentication=SPNEGO;serialization=" + serialization;
+      LOG.info("JDBC URL {}", url);
+
+      parameters.add(new Object[] {httpServer, url});
+    }
+
+    return parameters;
+  }
+
+  private final HttpServer httpServer;
+  private final String jdbcUrl;
+
+  public AvaticaSpnegoTest(HttpServer httpServer, String jdbcUrl) {
+    this.httpServer = Objects.requireNonNull(httpServer);
+    this.jdbcUrl = Objects.requireNonNull(jdbcUrl);
+  }
+
+  @After public void stopHttpServer() {
+    if (null != httpServer) {
+      httpServer.stop();
+    }
+  }
+
+  @Test public void testAuthenticatedClient() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try {
+      final String tableName = "allowed_clients";
+      // Create the subject for the client
+      final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(SpnegoTestUtil.CLIENT_PRINCIPAL,
+          clientKeytab);
+
+      // The name of the principal
+
+      // Run this code, logged in as the subject (the client)
+      Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
+        @Override public Void run() throws Exception {
+          try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
+            try (Statement stmt = conn.createStatement()) {
+              assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+              assertFalse(stmt.execute("CREATE TABLE " + tableName + "(pk integer)"));
+              assertEquals(1, stmt.executeUpdate("INSERT INTO " + tableName + " VALUES(1)"));
+              assertEquals(1, stmt.executeUpdate("INSERT INTO " + tableName + " VALUES(2)"));
+              assertEquals(1, stmt.executeUpdate("INSERT INTO " + tableName + " VALUES(3)"));
+
+              ResultSet results = stmt.executeQuery("SELECT count(1) FROM " + tableName);
+              assertTrue(results.next());
+              assertEquals(3, results.getInt(1));
+            }
+          }
+          return null;
+        }
+      });
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+}
+
+// End AvaticaSpnegoTest.java