You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/02 14:50:13 UTC

[1/7] drill git commit: DRILL-5425: Support HTTP Kerberos auth using SPNEGO

Repository: drill
Updated Branches:
  refs/heads/master e25c58f7b -> b4ffa4012


http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
new file mode 100644
index 0000000..2b7da56
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.spnego;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.categories.SecurityTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.server.rest.WebServerConstants;
+import org.apache.drill.exec.server.rest.auth.DrillSpnegoAuthenticator;
+import org.apache.drill.exec.server.rest.auth.DrillSpnegoLoginService;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.security.Authenticator;
+import org.eclipse.jetty.security.DefaultIdentityService;
+import org.eclipse.jetty.security.UserAuthentication;
+import org.eclipse.jetty.security.authentication.SessionAuthentication;
+import org.eclipse.jetty.server.Authentication;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import sun.security.jgss.GSSUtil;
+
+import javax.security.auth.Subject;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for validating {@link DrillSpnegoAuthenticator}
+ */
+@Ignore("See DRILL-5387")
+@Category(SecurityTest.class)
+public class TestDrillSpnegoAuthenticator {
+
+  private static KerberosHelper spnegoHelper;
+
+  private static final String primaryName = "HTTP";
+
+  private static DrillSpnegoAuthenticator spnegoAuthenticator;
+
+  private static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
+    spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
+
+
+    sun.security.krb5.Config.refresh();
+
+    // (2) Reset the default realm.
+    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
+    defaultRealm.setAccessible(true);
+    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
+
+    // Create a DrillbitContext with service principal and keytab for DrillSpnegoLoginService
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("spnego")))
+        .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+        .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+        false);
+
+    // Create mock objects for optionManager and AuthConfiguration
+    final SystemOptionManager optionManager = Mockito.mock(SystemOptionManager.class);
+    Mockito.when(optionManager.getOption(ExecConstants.ADMIN_USERS_VALIDATOR))
+        .thenReturn(ExecConstants.ADMIN_USERS_VALIDATOR.DEFAULT_ADMIN_USERS);
+    Mockito.when(optionManager.getOption(ExecConstants.ADMIN_USER_GROUPS_VALIDATOR))
+        .thenReturn(ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.DEFAULT_ADMIN_USER_GROUPS);
+
+    final DrillbitContext drillbitContext = Mockito.mock(DrillbitContext.class);
+    Mockito.when(drillbitContext.getConfig()).thenReturn(newConfig);
+    Mockito.when(drillbitContext.getOptionManager()).thenReturn(optionManager);
+
+    Authenticator.AuthConfiguration authConfiguration = Mockito.mock(Authenticator.AuthConfiguration.class);
+
+    spnegoAuthenticator = new DrillSpnegoAuthenticator("SPNEGO");
+    DrillSpnegoLoginService spnegoLoginService = new DrillSpnegoLoginService(drillbitContext);
+
+    Mockito.when(authConfiguration.getLoginService()).thenReturn(spnegoLoginService);
+    Mockito.when(authConfiguration.getIdentityService()).thenReturn(new DefaultIdentityService());
+    Mockito.when(authConfiguration.isSessionRenewedOnAuthentication()).thenReturn(true);
+
+    // Set the login service and identity service inside SpnegoAuthenticator
+    spnegoAuthenticator.setConfiguration(authConfiguration);
+  }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    spnegoHelper.stopKdc();
+  }
+
+  /**
+   * Test to verify response when request is sent for {@link WebServerConstants#SPENGO_LOGIN_RESOURCE_PATH} from
+   * unauthenticated session. Expectation is client will receive response with Negotiate header.
+   * @throws Exception
+   */
+  @Test
+  public void testNewSessionReqForSpnegoLogin() throws Exception {
+    final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    final HttpSession session = Mockito.mock(HttpSession.class);
+
+    Mockito.when(request.getSession(true)).thenReturn(session);
+    Mockito.when(request.getRequestURI()).thenReturn(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH);
+
+    final Authentication authentication = spnegoAuthenticator.validateRequest(request, response, false);
+
+    assertEquals(authentication, Authentication.SEND_CONTINUE);
+    verify(response).sendError(401);
+    verify(response).setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+  }
+
+  /**
+   * Test to verify response when request is sent for {@link WebServerConstants#SPENGO_LOGIN_RESOURCE_PATH} from
+   * authenticated session. Expectation is server will find the authenticated UserIdentity.
+   * @throws Exception
+   */
+  @Test
+  public void testAuthClientRequestForSpnegoLoginResource() throws Exception {
+
+    final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    final HttpSession session = Mockito.mock(HttpSession.class);
+    final Authentication authentication = Mockito.mock(UserAuthentication.class);
+
+    Mockito.when(request.getSession(true)).thenReturn(session);
+    Mockito.when(request.getRequestURI()).thenReturn(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH);
+    Mockito.when(session.getAttribute(SessionAuthentication.__J_AUTHENTICATED)).thenReturn(authentication);
+
+    final UserAuthentication returnedAuthentication = (UserAuthentication) spnegoAuthenticator.validateRequest
+        (request, response, false);
+    assertEquals(authentication, returnedAuthentication);
+    verify(response, never()).sendError(401);
+    verify(response, never()).setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+  }
+
+  /**
+   * Test to verify response when request is sent for any other resource other than
+   * {@link WebServerConstants#SPENGO_LOGIN_RESOURCE_PATH} from authenticated session. Expectation is server will
+   * find the authenticated UserIdentity and will not perform the authentication again for new resource.
+   * @throws Exception
+   */
+  @Test
+  public void testAuthClientRequestForOtherPage() throws Exception {
+
+    final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    final HttpSession session = Mockito.mock(HttpSession.class);
+    final Authentication authentication = Mockito.mock(UserAuthentication.class);
+
+    Mockito.when(request.getSession(true)).thenReturn(session);
+    Mockito.when(request.getRequestURI()).thenReturn(WebServerConstants.WEBSERVER_ROOT_PATH);
+    Mockito.when(session.getAttribute(SessionAuthentication.__J_AUTHENTICATED)).thenReturn(authentication);
+
+    final UserAuthentication returnedAuthentication = (UserAuthentication) spnegoAuthenticator.validateRequest
+        (request, response, false);
+    assertEquals(authentication, returnedAuthentication);
+    verify(response, never()).sendError(401);
+    verify(response, never()).setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+  }
+
+  /**
+   * Test to verify that when request is sent for {@link WebServerConstants#LOGOUT_RESOURCE_PATH} then the UserIdentity
+   * will be removed from the session and returned authentication will be null from
+   * {@link DrillSpnegoAuthenticator#validateRequest(ServletRequest, ServletResponse, boolean)}
+   * @throws Exception
+   */
+  @Test
+  public void testAuthClientRequestForLogOut() throws Exception {
+    final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    final HttpSession session = Mockito.mock(HttpSession.class);
+    final Authentication authentication = Mockito.mock(UserAuthentication.class);
+
+    Mockito.when(request.getSession(true)).thenReturn(session);
+    Mockito.when(request.getRequestURI()).thenReturn(WebServerConstants.LOGOUT_RESOURCE_PATH);
+    Mockito.when(session.getAttribute(SessionAuthentication.__J_AUTHENTICATED)).thenReturn(authentication);
+
+    final UserAuthentication returnedAuthentication = (UserAuthentication) spnegoAuthenticator.validateRequest
+        (request, response, false);
+    assertNull(returnedAuthentication);
+    verify(session).removeAttribute(SessionAuthentication.__J_AUTHENTICATED);
+    verify(response, never()).sendError(401);
+    verify(response, never()).setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+  }
+
+  /**
+   * Test to verify authentication fails when client sends invalid SPNEGO token for the
+   * {@link WebServerConstants#SPENGO_LOGIN_RESOURCE_PATH} resource.
+   * @throws Exception
+   */
+  @Test
+  public void testSpnegoLoginInvalidToken() throws Exception {
+
+    final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    final HttpSession session = Mockito.mock(HttpSession.class);
+
+    // Create client subject using it's principal and keytab
+    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(spnegoHelper.CLIENT_PRINCIPAL,
+        spnegoHelper.clientKeytab.getAbsoluteFile());
+
+    // Generate a SPNEGO token for the peer SERVER_PRINCIPAL from this CLIENT_PRINCIPAL
+    final String token = Subject.doAs(clientSubject, new PrivilegedExceptionAction<String>() {
+      @Override
+      public String run() throws Exception {
+
+        final GSSManager gssManager = GSSManager.getInstance();
+        GSSContext gssContext = null;
+        try {
+          final Oid oid = GSSUtil.GSS_SPNEGO_MECH_OID;
+          final GSSName serviceName = gssManager.createName(spnegoHelper.SERVER_PRINCIPAL, GSSName.NT_USER_NAME, oid);
+
+          gssContext = gssManager.createContext(serviceName, oid, null, GSSContext.DEFAULT_LIFETIME);
+          gssContext.requestCredDeleg(true);
+          gssContext.requestMutualAuth(true);
+
+          byte[] outToken = new byte[0];
+          outToken = gssContext.initSecContext(outToken, 0, outToken.length);
+          return Base64.encodeBase64String(outToken);
+
+        } finally {
+          if (gssContext != null) {
+            gssContext.dispose();
+          }
+        }
+      }
+    });
+
+    Mockito.when(request.getSession(true)).thenReturn(session);
+
+    final String httpReqAuthHeader = String.format("%s:%s", HttpHeader.NEGOTIATE.asString(), String.format
+        ("%s%s","1234", token));
+    Mockito.when(request.getHeader(HttpHeader.AUTHORIZATION.asString())).thenReturn(httpReqAuthHeader);
+    Mockito.when(request.getRequestURI()).thenReturn(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH);
+
+    assertEquals(spnegoAuthenticator.validateRequest(request, response, false), Authentication.UNAUTHENTICATED);
+
+    verify(session, never()).setAttribute(SessionAuthentication.__J_AUTHENTICATED, null);
+    verify(response, never()).sendError(401);
+    verify(response, never()).setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
new file mode 100644
index 0000000..51171cd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.spnego;
+
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.categories.SecurityTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.security.AuthenticatorProviderImpl;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.server.rest.auth.DrillHttpSecurityHandlerProvider;
+import org.apache.drill.exec.server.rest.auth.DrillSpnegoLoginService;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
+import org.eclipse.jetty.server.UserIdentity;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import sun.security.jgss.GSSUtil;
+
+import javax.security.auth.Subject;
+import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for validating {@link DrillSpnegoLoginService}
+ */
+@Ignore("See DRILL-5387")
+@Category(SecurityTest.class)
+public class TestSpnegoAuthentication {
+
+  private static KerberosHelper spnegoHelper;
+
+  private static final String primaryName = "HTTP";
+
+  private static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
+    spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
+
+
+    sun.security.krb5.Config.refresh();
+
+    // (2) Reset the default realm.
+    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
+    defaultRealm.setAccessible(true);
+    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
+  }
+
+  /**
+   * Both SPNEGO and FORM mechanism is enabled for WebServer in configuration. Test to see if the respective security
+   * handlers are created successfully or not.
+   * @throws Exception
+   */
+  @Test
+  public void testSPNEGOAndFORMEnabled() throws Exception {
+
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("form", "spnego")))
+        .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+        .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+        false);
+
+    final ScanResult scanResult = ClassPathScanner.fromPrescan(newConfig);
+    final AuthenticatorProviderImpl authenticatorProvider = Mockito.mock(AuthenticatorProviderImpl.class);
+    Mockito.when(authenticatorProvider.containsFactory(PlainFactory.SIMPLE_NAME)).thenReturn(true);
+
+    final DrillbitContext context = Mockito.mock(DrillbitContext.class);
+    Mockito.when(context.getClasspathScan()).thenReturn(scanResult);
+    Mockito.when(context.getConfig()).thenReturn(newConfig);
+    Mockito.when(context.getAuthProvider()).thenReturn(authenticatorProvider);
+
+    final DrillHttpSecurityHandlerProvider securityProvider = new DrillHttpSecurityHandlerProvider(newConfig, context);
+    assertTrue(securityProvider.isFormEnabled());
+    assertTrue(securityProvider.isSpnegoEnabled());
+  }
+
+  /**
+   * Validate if FORM security handler is created successfully when only form is configured as auth mechanism
+   * @throws Exception
+   */
+  @Test
+  public void testOnlyFORMEnabled() throws Exception {
+
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("form")))
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+        .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+        false);
+
+    final ScanResult scanResult = ClassPathScanner.fromPrescan(newConfig);
+    final AuthenticatorProviderImpl authenticatorProvider = Mockito.mock(AuthenticatorProviderImpl.class);
+    Mockito.when(authenticatorProvider.containsFactory(PlainFactory.SIMPLE_NAME)).thenReturn(true);
+
+    final DrillbitContext context = Mockito.mock(DrillbitContext.class);
+    Mockito.when(context.getClasspathScan()).thenReturn(scanResult);
+    Mockito.when(context.getConfig()).thenReturn(newConfig);
+    Mockito.when(context.getAuthProvider()).thenReturn(authenticatorProvider);
+
+    final DrillHttpSecurityHandlerProvider securityProvider = new DrillHttpSecurityHandlerProvider(newConfig, context);
+    assertTrue(securityProvider.isFormEnabled());
+    assertTrue(!securityProvider.isSpnegoEnabled());
+  }
+
+  /**
+   * Validate failure in creating FORM security handler when PAM authenticator is absent. PAM authenticator is provided
+   * via {@link PlainFactory#getAuthenticator()}
+   * @throws Exception
+   */
+  @Test
+  public void testFORMEnabledWithPlainDisabled() throws Exception {
+    try {
+      final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+          .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+              ConfigValueFactory.fromIterable(Lists.newArrayList("form")))
+          .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+          .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+              ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+          false);
+
+      final ScanResult scanResult = ClassPathScanner.fromPrescan(newConfig);
+      final AuthenticatorProviderImpl authenticatorProvider = Mockito.mock(AuthenticatorProviderImpl.class);
+      Mockito.when(authenticatorProvider.containsFactory(PlainFactory.SIMPLE_NAME)).thenReturn(false);
+
+      final DrillbitContext context = Mockito.mock(DrillbitContext.class);
+      Mockito.when(context.getClasspathScan()).thenReturn(scanResult);
+      Mockito.when(context.getConfig()).thenReturn(newConfig);
+      Mockito.when(context.getAuthProvider()).thenReturn(authenticatorProvider);
+
+      final DrillHttpSecurityHandlerProvider securityProvider =
+          new DrillHttpSecurityHandlerProvider(newConfig, context);
+      fail();
+    } catch(Exception ex) {
+      assertTrue(ex instanceof DrillbitStartupException);
+    }
+  }
+
+  /**
+   * Validate only SPNEGO security handler is configured properly when enabled via configuration
+   * @throws Exception
+   */
+  @Test
+  public void testOnlySPNEGOEnabled() throws Exception {
+
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("spnego")))
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+        .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+        false);
+
+    final ScanResult scanResult = ClassPathScanner.fromPrescan(newConfig);
+    final AuthenticatorProviderImpl authenticatorProvider = Mockito.mock(AuthenticatorProviderImpl.class);
+    Mockito.when(authenticatorProvider.containsFactory(PlainFactory.SIMPLE_NAME)).thenReturn(false);
+
+    final DrillbitContext context = Mockito.mock(DrillbitContext.class);
+    Mockito.when(context.getClasspathScan()).thenReturn(scanResult);
+    Mockito.when(context.getConfig()).thenReturn(newConfig);
+    Mockito.when(context.getAuthProvider()).thenReturn(authenticatorProvider);
+
+    final DrillHttpSecurityHandlerProvider securityProvider = new DrillHttpSecurityHandlerProvider(newConfig, context);
+
+    assertTrue(!securityProvider.isFormEnabled());
+    assertTrue(securityProvider.isSpnegoEnabled());
+  }
+
+  /**
+   * Validate when none of the security mechanism is specified in the
+   * {@link ExecConstants#HTTP_AUTHENTICATION_MECHANISMS}, FORM security handler is still configured correctly when
+   * authentication is enabled along with PAM authenticator module.
+   * @throws Exception
+   */
+  @Test
+  public void testConfigBackwardCompatibility() throws Exception {
+
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true)),
+        false);
+
+    final ScanResult scanResult = ClassPathScanner.fromPrescan(newConfig);
+    final AuthenticatorProviderImpl authenticatorProvider = Mockito.mock(AuthenticatorProviderImpl.class);
+    Mockito.when(authenticatorProvider.containsFactory(PlainFactory.SIMPLE_NAME)).thenReturn(true);
+
+    final DrillbitContext context = Mockito.mock(DrillbitContext.class);
+    Mockito.when(context.getClasspathScan()).thenReturn(scanResult);
+    Mockito.when(context.getConfig()).thenReturn(newConfig);
+    Mockito.when(context.getAuthProvider()).thenReturn(authenticatorProvider);
+
+    final DrillHttpSecurityHandlerProvider securityProvider = new DrillHttpSecurityHandlerProvider(newConfig, context);
+
+    assertTrue(securityProvider.isFormEnabled());
+    assertTrue(!securityProvider.isSpnegoEnabled());
+  }
+
+  /**
+   * Validate successful {@link DrillSpnegoLoginService#login(String, Object)} when provided with client token for a
+   * configured service principal.
+   * @throws Exception
+   */
+  @Test
+  public void testDrillSpnegoLoginService() throws Exception {
+
+    // Create client subject using it's principal and keytab
+    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(spnegoHelper.CLIENT_PRINCIPAL,
+            spnegoHelper.clientKeytab.getAbsoluteFile());
+
+    // Generate a SPNEGO token for the peer SERVER_PRINCIPAL from this CLIENT_PRINCIPAL
+    final String token = Subject.doAs(clientSubject, new PrivilegedExceptionAction<String>() {
+      @Override
+      public String run() throws Exception {
+
+        final GSSManager gssManager = GSSManager.getInstance();
+        GSSContext gssContext = null;
+        try {
+          final Oid oid = GSSUtil.GSS_SPNEGO_MECH_OID;
+          final GSSName serviceName = gssManager.createName(spnegoHelper.SERVER_PRINCIPAL, GSSName.NT_USER_NAME, oid);
+
+          gssContext = gssManager.createContext(serviceName, oid, null, GSSContext.DEFAULT_LIFETIME);
+          gssContext.requestCredDeleg(true);
+          gssContext.requestMutualAuth(true);
+
+          byte[] outToken = new byte[0];
+          outToken = gssContext.initSecContext(outToken, 0, outToken.length);
+          return Base64.encodeBase64String(outToken);
+
+        } finally {
+          if (gssContext != null) {
+            gssContext.dispose();
+          }
+        }
+      }
+    });
+
+    // Create a DrillbitContext with service principal and keytab for DrillSpnegoLoginService
+    final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+        .withValue(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("spnego")))
+        .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+        .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+            ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())),
+        false);
+
+
+    final SystemOptionManager optionManager = Mockito.mock(SystemOptionManager.class);
+    Mockito.when(optionManager.getOption(ExecConstants.ADMIN_USERS_VALIDATOR))
+        .thenReturn(ExecConstants.ADMIN_USERS_VALIDATOR.DEFAULT_ADMIN_USERS);
+    Mockito.when(optionManager.getOption(ExecConstants.ADMIN_USER_GROUPS_VALIDATOR))
+        .thenReturn(ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.DEFAULT_ADMIN_USER_GROUPS);
+
+    final DrillbitContext drillbitContext = Mockito.mock(DrillbitContext.class);
+    Mockito.when(drillbitContext.getConfig()).thenReturn(newConfig);
+    Mockito.when(drillbitContext.getOptionManager()).thenReturn(optionManager);
+
+    final DrillSpnegoLoginService loginService = new DrillSpnegoLoginService(drillbitContext);
+
+    // Authenticate the client using its SPNEGO token
+    final UserIdentity user = loginService.login(null, token);
+
+    // Validate the UserIdentity of authenticated client
+    assertTrue(user != null);
+    assertTrue(user.getUserPrincipal().getName().equals(spnegoHelper.CLIENT_PRINCIPAL));
+    assertTrue(user.isUserInRole("authenticated", null));
+  }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    spnegoHelper.stopKdc();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
new file mode 100644
index 0000000..7803b9a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.spnego;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.SecurityTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.rest.auth.SpnegoConfig;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.lang.reflect.Field;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for validating {@link SpnegoConfig}
+ */
+@Ignore("See DRILL-5387")
+@Category(SecurityTest.class)
+public class TestSpnegoConfig {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpnegoConfig.class);
+
+  private static KerberosHelper spnegoHelper;
+
+  private static final String primaryName = "HTTP";
+
+  private static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
+    spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
+
+
+    sun.security.krb5.Config.refresh();
+
+    // (2) Reset the default realm.
+    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
+    defaultRealm.setAccessible(true);
+    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
+  }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    spnegoHelper.stopKdc();
+  }
+
+  /**
+   * Test invalid {@link SpnegoConfig} with missing keytab and principal
+   * @throws Exception
+   */
+  @Test
+  public void testInvalidSpnegoConfig() throws Exception {
+    // Invalid configuration for SPNEGO
+    try {
+      final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+          .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+              ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
+          .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+              ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)),
+          false);
+
+      final SpnegoConfig spnegoConfig = new SpnegoConfig(newConfig);
+      spnegoConfig.validateSpnegoConfig();
+      fail();
+    } catch (Exception ex) {
+      assertTrue(ex instanceof DrillException);
+    }
+  }
+
+  /**
+   * Invalid configuration with keytab only and missing principal
+   * @throws Exception
+   */
+  @Test
+  public void testSpnegoConfigOnlyKeytab() throws Exception {
+    try {
+      final DrillConfig newConfig = new DrillConfig(DrillConfig.create().withValue(ExecConstants.USER_AUTHENTICATION_ENABLED, ConfigValueFactory.fromAnyRef(true)).withValue(ExecConstants.AUTHENTICATION_MECHANISMS, ConfigValueFactory.fromIterable(Lists.newArrayList("plain"))).withValue(ExecConstants.HTTP_SPNEGO_KEYTAB, ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString())).withValue(ExecConstants.USER_AUTHENTICATOR_IMPL, ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)), false);
+
+      final SpnegoConfig spnegoConfig = new SpnegoConfig(newConfig);
+      spnegoConfig.validateSpnegoConfig();
+      fail();
+    } catch (Exception ex) {
+      assertTrue(ex instanceof DrillException);
+    }
+  }
+
+  /**
+   * Invalid configuration with principal only and missing keytab
+   * @throws Exception
+   */
+  @Test
+  public void testSpnegoConfigOnlyPrincipal() throws Exception {
+    try {
+      final DrillConfig newConfig = new DrillConfig(DrillConfig.create().withValue(ExecConstants.USER_AUTHENTICATION_ENABLED, ConfigValueFactory.fromAnyRef(true)).withValue(ExecConstants.AUTHENTICATION_MECHANISMS, ConfigValueFactory.fromIterable(Lists.newArrayList("plain"))).withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL, ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL)).withValue(ExecConstants.USER_AUTHENTICATOR_IMPL, ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)), false);
+
+      final SpnegoConfig spnegoConfig = new SpnegoConfig(newConfig);
+      spnegoConfig.validateSpnegoConfig();
+      fail();
+    } catch (Exception ex) {
+      assertTrue(ex instanceof DrillException);
+    }
+  }
+
+  /**
+   * Valid Configuration with both keytab & principal
+   * @throws Exception
+   */
+  @Test
+  public void testValidSpnegoConfig() throws Exception {
+
+    try {
+      final DrillConfig newConfig = new DrillConfig(DrillConfig.create()
+          .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+              ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
+          .withValue(ExecConstants.HTTP_SPNEGO_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(spnegoHelper.SERVER_PRINCIPAL))
+          .withValue(ExecConstants.HTTP_SPNEGO_KEYTAB,
+              ConfigValueFactory.fromAnyRef(spnegoHelper.serverKeytab.toString()))
+          .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+              ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)),
+          false);
+
+      final SpnegoConfig spnegoConfig = new SpnegoConfig(newConfig);
+      spnegoConfig.validateSpnegoConfig();
+      UserGroupInformation ugi = spnegoConfig.getLoggedInUgi();
+      assertEquals(primaryName, ugi.getShortUserName());
+      assertEquals(spnegoHelper.SERVER_PRINCIPAL, ugi.getUserName());
+    } catch (Exception ex) {
+      fail();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 67fc89a..888a49b 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -202,6 +202,7 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio
         if (e instanceof BindException && allowPortHunting) {
           continue;
         }
+
         final UserException bindException =
             UserException
               .resourceError( e )


[3/7] drill git commit: DRILL-5963: Query state process improvements

Posted by ar...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
new file mode 100644
index 0000000..2443139
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.common.EventProcessor;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanResult;
+
+/**
+ * Is responsible for query transition from one state to another,
+ * incrementing / decrementing query statuses counters.
+ */
+public class QueryStateProcessor implements AutoCloseable {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStateProcessor.class);
+
+  private static final Counter planningQueries = DrillMetrics.getRegistry().counter("drill.queries.planning");
+  private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
+  private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
+  private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
+  private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded");
+  private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed");
+  private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled");
+
+  private final StateSwitch stateSwitch = new StateSwitch();
+
+  private final String queryIdString;
+  private final QueryManager queryManager;
+  private final DrillbitContext drillbitContext;
+  private final ForemanResult foremanResult;
+
+  private volatile QueryState state;
+
+  public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) {
+    this.queryIdString = queryIdString;
+    this.queryManager = queryManager;
+    this.drillbitContext = drillbitContext;
+    this.foremanResult = foremanResult;
+    // initial query state is PREPARING
+    this.state = QueryState.PREPARING;
+  }
+
+  /**
+   * @return current query state
+   */
+  public QueryState getState() {
+    return state;
+  }
+
+  /**
+   * Moves one query state to another, will fail when requested query state transition is not allowed.
+   *
+   * @param newState new query state
+   * @param exception exception if failure occurred
+   */
+  public synchronized void moveToState(QueryState newState, Exception exception) {
+    logger.debug(queryIdString + ": State change requested {} --> {}", state, newState);
+
+    switch (state) {
+      case PREPARING:
+        preparing(newState, exception);
+        return;
+      case PLANNING:
+        planning(newState, exception);
+        return;
+      case ENQUEUED:
+        enqueued(newState, exception);
+        return;
+      case STARTING:
+        starting(newState, exception);
+        return;
+      case RUNNING:
+        running(newState, exception);
+        return;
+      case CANCELLATION_REQUESTED:
+        cancellationRequested(newState, exception);
+        return;
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state);
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  /**
+   * Directly moves query from one state to another and updates ephemeral query store.
+   *
+   * @param newState new query state
+   */
+  public void recordNewState(final QueryState newState) {
+    state = newState;
+    queryManager.updateEphemeralState(newState);
+  }
+
+  /**
+   * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated.
+   * For preparing, planning and enqueued states we cancel immediately since these states are done locally.
+   *
+   * Note this can be called from outside of run() on another thread, or after run() completes
+   */
+  public void cancel() {
+    switch (state) {
+      case PREPARING:
+      case PLANNING:
+      case ENQUEUED:
+        moveToState(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+
+      case STARTING:
+      case RUNNING:
+        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+
+      case CANCELLATION_REQUESTED:
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        // nothing to do
+        return;
+
+      default:
+        throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state);
+    }
+  }
+
+  /**
+   * Tells the foreman to move to a new state.<br>
+   * This will be added to the end of the event queue and will be processed once the foreman is ready
+   * to accept external events.
+   *
+   * @param newState the state to move to
+   * @param exception if not null, the exception that drove this state transition (usually a failure)
+   */
+  public void addToEventQueue(final QueryState newState, final Exception exception) {
+    stateSwitch.addEvent(newState, exception);
+  }
+
+  /**
+   * Starts processing all events that were enqueued while all fragments were sending out.
+   */
+  public void startProcessingEvents() {
+    try {
+      stateSwitch.start();
+    } catch (Exception ex) {
+      moveToState(QueryState.FAILED, ex);
+    }
+  }
+
+  /**
+   * On close set proper increment / decrement counters depending on final query state.
+   */
+  @Override
+  public void close() {
+    queryManager.markEndTime();
+
+    switch (state) {
+      case FAILED:
+        failedQueries.inc();
+        break;
+      case CANCELED:
+        canceledQueries.inc();
+        break;
+      case COMPLETED:
+        succeededQueries.inc();
+        break;
+    }
+
+    runningQueries.dec();
+    completedQueries.inc();
+  }
+
+
+  private void preparing(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case PLANNING:
+        queryManager.markStartTime();
+        runningQueries.inc();
+
+        recordNewState(newState);
+        planningQueries.inc();
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void planning(final QueryState newState, final Exception exception) {
+    planningQueries.dec();
+    queryManager.markPlanningEndTime();
+    switch (newState) {
+      case ENQUEUED:
+        recordNewState(newState);
+        enqueuedQueries.inc();
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void enqueued(final QueryState newState, final Exception exception) {
+    enqueuedQueries.dec();
+    queryManager.markQueueWaitEndTime();
+    switch (newState) {
+      case STARTING:
+        recordNewState(newState);
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void starting(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case RUNNING:
+        recordNewState(QueryState.RUNNING);
+        return;
+      case COMPLETED:
+        wrapUpCompletion();
+      case CANCELLATION_REQUESTED:
+        // since during starting state fragments are sent to the remote nodes,
+        // we don't want to cancel until they all are sent out
+        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+    }
+
+    checkCommonStates(newState, exception);
+  }
+
+  private void running(final QueryState newState, final Exception exception) {
+      /*
+       * For cases that cancel executing fragments, we have to record the new
+       * state first, because the cancellation of the local root fragment will
+       * cause this to be called recursively.
+       */
+    switch (newState) {
+      case CANCELLATION_REQUESTED: {
+        assert exception == null;
+        recordNewState(QueryState.CANCELLATION_REQUESTED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setCompleted(QueryState.CANCELED);
+        /*
+         * We don't close the foremanResult until we've gotten
+         * acknowledgments, which happens below in the case for current state
+         * == CANCELLATION_REQUESTED.
+         */
+        return;
+      }
+
+      case COMPLETED: {
+        wrapUpCompletion();
+        return;
+      }
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void cancellationRequested(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case FAILED:
+        if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+          assert exception != null;
+          recordNewState(QueryState.FAILED);
+          foremanResult.setForceFailure(exception);
+        }
+
+        // proceed
+
+      case CANCELED:
+      case COMPLETED:
+        /*
+         * These amount to a completion of the cancellation requests' cleanup;
+         * now we can clean up and send the result.
+         */
+        foremanResult.close();
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  private void wrapUpCancellation() {
+    recordNewState(QueryState.CANCELLATION_REQUESTED);
+    foremanResult.setCompleted(QueryState.CANCELED);
+  }
+
+  private void wrapUpCompletion() {
+    recordNewState(QueryState.COMPLETED);
+    foremanResult.setCompleted(QueryState.COMPLETED);
+    foremanResult.close();
+  }
+
+  private void checkCommonStates(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case FAILED:
+        assert exception != null;
+        recordNewState(QueryState.FAILED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setFailed(exception);
+        foremanResult.close();
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  private class StateEvent {
+    final QueryState newState;
+    final Exception exception;
+
+    StateEvent(final QueryState newState, final Exception exception) {
+      this.newState = newState;
+      this.exception = exception;
+    }
+  }
+
+  private class StateSwitch extends EventProcessor<StateEvent> {
+    public void addEvent(final QueryState newState, final Exception exception) {
+      sendEvent(new StateEvent(newState, exception));
+    }
+
+    @Override
+    protected void processEvent(final StateEvent event) {
+      moveToState(event.newState, event.exception);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a0cf643..cb66ca3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -306,7 +306,7 @@ drill.exec: {
       size: 2,
       // Maximum wait time in the queue before the query times out and
       // fails.
-      timeout: 5000 // 5 seconds
+      timeout_ms: 5000 // 5 seconds
     }
   }
   memory: {

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 956cfc4..ec101d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.math3.util.Pair;
+import org.apache.drill.exec.work.foreman.FragmentsRunner;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
@@ -757,7 +758,7 @@ public class TestDrillbitResilience extends DrillTest {
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
     final String controls = Controls.newBuilder()
-    .addException(Foreman.class, exceptionDesc, exceptionClass)
+    .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
       .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 51cdab7..edc401c 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -10377,6 +10377,22 @@ public final class UserBitShared {
        * </pre>
        */
       ENQUEUED(6, 6),
+      /**
+       * <code>PREPARING = 7;</code>
+       *
+       * <pre>
+       * query is at preparation stage, foreman is initializing
+       * </pre>
+       */
+      PREPARING(7, 7),
+      /**
+       * <code>PLANNING = 8;</code>
+       *
+       * <pre>
+       * query is at planning stage (includes logical or / and physical planning)
+       * </pre>
+       */
+      PLANNING(8, 8),
       ;
 
       /**
@@ -10427,6 +10443,22 @@ public final class UserBitShared {
        * </pre>
        */
       public static final int ENQUEUED_VALUE = 6;
+      /**
+       * <code>PREPARING = 7;</code>
+       *
+       * <pre>
+       * query is at preparation stage, foreman is initializing
+       * </pre>
+       */
+      public static final int PREPARING_VALUE = 7;
+      /**
+       * <code>PLANNING = 8;</code>
+       *
+       * <pre>
+       * query is at planning stage (includes logical or / and physical planning)
+       * </pre>
+       */
+      public static final int PLANNING_VALUE = 8;
 
 
       public final int getNumber() { return value; }
@@ -10440,6 +10472,8 @@ public final class UserBitShared {
           case 4: return FAILED;
           case 5: return CANCELLATION_REQUESTED;
           case 6: return ENQUEUED;
+          case 7: return PREPARING;
+          case 8: return PLANNING;
           default: return null;
         }
       }
@@ -23942,92 +23976,93 @@ public final class UserBitShared {
       "ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" +
       "ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" +
       "eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" +
-      "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" +
+      "rint\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_stat" +
       "e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS",
       "tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" +
       "ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" +
-      "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" +
-      "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" +
-      "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" +
-      "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" +
-      "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" +
-      "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" +
-      "hDef\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" +
-      "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu",
-      "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" +
-      "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
-      "\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001" +
-      "\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile" +
-      "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" +
-      "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" +
-      "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" +
-      "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" +
-      "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." +
-      "QueryResult.QueryState\022\027\n\017total_fragment",
-      "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" +
-      "ragment_profile\030\013 \003(\0132!.exec.shared.Majo" +
-      "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" +
-      "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" +
-      "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_" +
-      "json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWait" +
-      "End\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_n" +
-      "ame\030\025 \001(\t:\001-\"t\n\024MajorFragmentProfile\022\031\n\021" +
-      "major_fragment_id\030\001 \001(\005\022A\n\026minor_fragmen" +
-      "t_profile\030\002 \003(\0132!.exec.shared.MinorFragm",
-      "entProfile\"\350\002\n\024MinorFragmentProfile\022)\n\005s" +
-      "tate\030\001 \001(\0162\032.exec.shared.FragmentState\022(" +
-      "\n\005error\030\002 \001(\0132\031.exec.shared.DrillPBError" +
-      "\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_" +
-      "profile\030\004 \003(\0132\034.exec.shared.OperatorProf" +
-      "ile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(" +
-      "\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_use" +
-      "d\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbi" +
-      "tEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_p" +
-      "rogress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinp",
-      "ut_profile\030\001 \003(\0132\032.exec.shared.StreamPro" +
-      "file\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_ty" +
-      "pe\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess" +
-      "_nanos\030\006 \001(\003\022#\n\033peak_local_memory_alloca" +
-      "ted\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared." +
-      "MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStrea" +
-      "mProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001" +
-      "(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tme" +
-      "tric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014dou" +
-      "ble_value\030\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\013",
-      "2\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022" +
-      "\032\n\022function_signature\030\002 \003(\t\"W\n\013SaslMessa" +
-      "ge\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006s" +
-      "tatus\030\003 \001(\0162\027.exec.shared.SaslStatus*5\n\n" +
-      "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" +
-      "\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" +
-      "ICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022P" +
-      "REPARED_STATEMENT\020\005*\207\001\n\rFragmentState\022\013\n" +
-      "\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007R" +
-      "UNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n",
-      "\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\360\005" +
-      "\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n" +
-      "\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_" +
-      "AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN" +
-      "\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007" +
-      "\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTIT" +
-      "ION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_R" +
-      "ECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022" +
-      "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" +
-      "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN",
-      "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" +
-      "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" +
-      "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" +
-      "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" +
-      "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" +
-      "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" +
-      "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" +
-      "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" +
-      "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" +
-      "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSasl",
-      "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" +
-      "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
-      "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
-      "ec.protoB\rUserBitSharedH\001"
+      "BError\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007R" +
+      "UNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n" +
+      "\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n" +
+      "\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010" +
+      "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" +
+      "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" +
+      "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQ" +
+      "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222",
+      "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult." +
+      "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" +
+      " \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_" +
+      "json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_" +
+      "name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001" +
+      "(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026" +
+      ".exec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n" +
+      "\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022" +
+      "\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint" +
+      "\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryResul",
+      "t.QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n" +
+      "\022finished_fragments\030\n \001(\005\022;\n\020fragment_pr" +
+      "ofile\030\013 \003(\0132!.exec.shared.MajorFragmentP" +
+      "rofile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024" +
+      "\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022" +
+      "\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t" +
+      "\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022" +
+      "\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:" +
+      "\001-\"t\n\024MajorFragmentProfile\022\031\n\021major_frag" +
+      "ment_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030",
+      "\002 \003(\0132!.exec.shared.MinorFragmentProfile" +
+      "\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\016" +
+      "2\032.exec.shared.FragmentState\022(\n\005error\030\002 " +
+      "\001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor_" +
+      "fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 " +
+      "\003(\0132\034.exec.shared.OperatorProfile\022\022\n\nsta" +
+      "rt_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memor" +
+      "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" +
+      "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" +
+      "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 ",
+      "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" +
+      "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" +
+      "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" +
+      "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" +
+      "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" +
+      "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" +
+      "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" +
+      "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" +
+      "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " +
+      "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030",
+      "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" +
+      "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" +
+      "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" +
+      "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" +
+      "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" +
+      "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" +
+      "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" +
+      "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" +
+      "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
+      "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014",
+      "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\360\005\n\020CoreOper" +
+      "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
+      "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
+      "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
+      "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" +
+      "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" +
+      "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" +
+      "\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTI" +
+      "ON_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGA",
+      "TE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022" +
+      "\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026P" +
+      "ARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN" +
+      "\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SC" +
+      "AN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_S" +
+      "CAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020" +
+      "\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_" +
+      "SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_" +
+      "CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW" +
+      "\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCA",
+      "N\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSaslStatus\022\020\n\014" +
+      "SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_I" +
+      "N_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_F" +
+      "AILED\020\004B.\n\033org.apache.drill.exec.protoB\r" +
+      "UserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
index 7b2a273..a53dc42 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
@@ -43,7 +43,9 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
         CANCELED(3),
         FAILED(4),
         CANCELLATION_REQUESTED(5),
-        ENQUEUED(6);
+        ENQUEUED(6),
+        PREPARING(7),
+        PLANNING(8);
         
         public final int number;
         
@@ -68,6 +70,8 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
                 case 4: return FAILED;
                 case 5: return CANCELLATION_REQUESTED;
                 case 6: return ENQUEUED;
+                case 7: return PREPARING;
+                case 8: return PLANNING;
                 default: return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 086b98a..205611b 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -167,6 +167,8 @@ message QueryResult {
 	  FAILED = 4;
 	  CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed
 	  ENQUEUED = 6; // query has been enqueued. this is pre-starting.
+	  PREPARING = 7; // query is at preparation stage, foreman is initializing
+	  PLANNING = 8; // query is at planning stage (includes logical or / and physical planning)
 	}
 
 	optional QueryState query_state = 1;


[7/7] drill git commit: DRILL-6020: Fix NullPointerException when querying JSON untyped path with Union setting on

Posted by ar...@apache.org.
DRILL-6020: Fix NullPointerException when querying JSON untyped path with Union setting on

closes #1068


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b4ffa401
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b4ffa401
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b4ffa401

Branch: refs/heads/master
Commit: b4ffa40127c040d2f8d9ebe2fd4623dfac8c7724
Parents: ce80da8
Author: mitchel <mi...@hotmail.com>
Authored: Tue Dec 12 09:57:55 2017 -0500
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:54:05 2018 +0200

----------------------------------------------------------------------
 .../drill/exec/vector/complex/FieldIdUtil.java  | 32 ++++++++++++--------
 .../vector/complex/writer/TestJsonReader.java   | 28 +++++++++++++++++
 2 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b4ffa401/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
index 2d3c13c..6e72b6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
@@ -31,21 +31,29 @@ public class FieldIdUtil {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldIdUtil.class);
 
   public static TypedFieldId getFieldIdIfMatchesUnion(UnionVector unionVector, TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg) {
-    if (seg.isNamed()) {
-      ValueVector v = unionVector.getMap();
-      if (v != null) {
-        return getFieldIdIfMatches(v, builder, addToBreadCrumb, seg);
-      } else {
-        return null;
+    if (seg != null) {
+      if (seg.isNamed()) {
+        ValueVector v = unionVector.getMap();
+        if (v != null) {
+          return getFieldIdIfMatches(v, builder, addToBreadCrumb, seg);
+        } else {
+          return null;
+        }
+      } else if (seg.isArray()) {
+        ValueVector v = unionVector.getList();
+        if (v != null) {
+          return getFieldIdIfMatches(v, builder, addToBreadCrumb, seg);
+        } else {
+          return null;
+        }
       }
-    } else if (seg.isArray()) {
-      ValueVector v = unionVector.getList();
-      if (v != null) {
-        return getFieldIdIfMatches(v, builder, addToBreadCrumb, seg);
-      } else {
-        return null;
+    } else {
+      if (addToBreadCrumb) {
+        builder.intermediateType(unionVector.getField().getType());
       }
+      return builder.finalType(unionVector.getField().getType()).build();
     }
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b4ffa401/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 3e3580f..da1cddb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -35,6 +35,8 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillFileUtils;
@@ -720,4 +722,30 @@ public class TestJsonReader extends BaseTestQuery {
       .baselineValues("1", "2", "1", null, "a")
       .go();
   }
+
+  @Test // DRILL-6020
+  public void testUntypedPathWithUnion() throws Exception {
+    String fileName = "table.json";
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(dirTestWatcher.getRootDir(), fileName)))) {
+      writer.write("{\"rk\": {\"a\": {\"b\": \"1\"}}}");
+      writer.write("{\"rk\": {\"a\": \"2\"}}");
+    }
+
+    JsonStringHashMap<String, Text> map = new JsonStringHashMap<>();
+    map.put("b", new Text("1"));
+
+    try {
+      testBuilder()
+        .sqlQuery("select t.rk.a as a from dfs.`%s` t", fileName)
+        .ordered()
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type`=true")
+        .baselineColumns("a")
+        .baselineValues(map)
+        .baselineValues("2")
+        .go();
+
+    } finally {
+      testNoResult("alter session reset `exec.enable_union_type`");
+    }
+  }
 }


[5/7] drill git commit: DRILL-5973: Support injection of time-bound pauses in server

Posted by ar...@apache.org.
DRILL-5973: Support injection of time-bound pauses in server

Support pause injections in the test framework that are time-bound, to allow for testing high latency scenarios.
e.g. delayed server response to the Drill client allows for test a server-induced timeout
Added tests to verify the commit.

closes #1055


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3df11e19
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3df11e19
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3df11e19

Branch: refs/heads/master
Commit: 3df11e1905ad384a7205d9daa0e2832394a6b890
Parents: 0343518
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Wed Dec 20 12:52:12 2017 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:43:30 2018 +0200

----------------------------------------------------------------------
 .../testing/CountDownLatchInjectionImpl.java    |   4 +-
 .../drill/exec/testing/ExceptionInjection.java  |   2 +-
 .../exec/testing/ExecutionControlsInjector.java |   9 +-
 .../apache/drill/exec/testing/Injection.java    |   8 +-
 .../drill/exec/testing/PauseInjection.java      |  24 ++++-
 .../org/apache/drill/exec/testing/Controls.java |  29 ++++++
 .../exec/testing/ControlsInjectionUtil.java     |  27 +++++
 .../drill/exec/testing/TestPauseInjection.java  | 101 +++++++++++++++++++
 8 files changed, 194 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index 7584bad..e4eeeb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 
-import java.util.concurrent.CountDownLatch;
-
 /**
  * See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
  * {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
@@ -42,7 +40,7 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
                                       @JsonProperty("port") final int port,
                                       @JsonProperty("siteClass") final String siteClass,
                                       @JsonProperty("desc") final String desc) throws InjectionConfigurationException {
-    super(address, port, siteClass, desc, 0, 1);
+    super(address, port, siteClass, desc, 0, 1, 0L);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
index 61f0d67..aff4dba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
@@ -44,7 +44,7 @@ public class ExceptionInjection extends Injection {
                              @JsonProperty("nSkip") final int nSkip,
                              @JsonProperty("nFire") final int nFire,
                              @JsonProperty("exceptionClass") String classString) throws InjectionConfigurationException {
-    super(address, port, siteClass, desc, nSkip, nFire);
+    super(address, port, siteClass, desc, nSkip, nFire, 0L);
     final Class<?> clazz;
     try {
       clazz = Class.forName(classString);

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index d8979d2..3a74fee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.testing;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.drill.exec.ops.FragmentContext;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -81,7 +83,12 @@ public class ExecutionControlsInjector implements ControlsInjector {
       executionControls.lookupPauseInjection(this, desc);
 
     if (pauseInjection != null) {
-      logger.debug("Pausing at {}", desc);
+      long pauseDuration = pauseInjection.getMsPause();
+      if ( pauseDuration > 0L) {
+        logger.debug("Pausing at {} for {} sec", desc, TimeUnit.MILLISECONDS.toSeconds(pauseDuration) );
+      } else {
+        logger.debug("Pausing at {}", desc);
+      }
       pauseInjection.pause();
       logger.debug("Resuming at {}", desc);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index 08ade51..1daf639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -32,9 +32,10 @@ public abstract class Injection {
   protected final String desc; // description of the injection site; useful for multiple exception injections in a single class
   private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
   private final AtomicInteger nFire;  // the number of times to do the injection, after any skips; starts > 0
+  private final long msPause; // duration of the injection (only applies to pause injections)
 
   protected Injection(final String address, final int port, final String siteClass, final String desc,
-                      final int nSkip, final int nFire) throws InjectionConfigurationException {
+                      final int nSkip, final int nFire, final long msPause) throws InjectionConfigurationException {
     if (desc == null || desc.isEmpty()) {
       throw new InjectionConfigurationException("Injection desc is null or empty.");
     }
@@ -57,6 +58,7 @@ public abstract class Injection {
     this.desc = desc;
     this.nSkip = new AtomicInteger(nSkip);
     this.nFire = new AtomicInteger(nFire);
+    this.msPause = msPause;
   }
 
   /**
@@ -81,4 +83,8 @@ public abstract class Injection {
     return address == null ||
       (address.equals(endpoint.getAddress()) && port == endpoint.getUserPort());
   }
+
+  public long getMsPause() {
+    return msPause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index fc4d8ec..89f47c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.concurrent.TimeUnit;
+
 import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 
 /**
  * Injection for a single pause. Pause indefinitely until signalled. This class is used internally for tracking
@@ -43,15 +45,29 @@ public class PauseInjection extends Injection {
                          @JsonProperty("port") final int port,
                          @JsonProperty("siteClass") final String siteClass,
                          @JsonProperty("desc") final String desc,
-                         @JsonProperty("nSkip") final int nSkip) throws InjectionConfigurationException {
-    super(address, port, siteClass, desc, nSkip, 1);
+                         @JsonProperty("nSkip") final int nSkip,
+                         @JsonProperty("msPause") final long msPause) throws InjectionConfigurationException {
+    //nFire is 1 since we will inject pauses only once
+    super(address, port, siteClass, desc, nSkip, 1, msPause);
   }
 
+  /**
+   * Pause indefinitely, unless a duration exists
+   */
   public void pause() {
     if (!injectNow()) {
       return;
     }
-    latch.awaitUninterruptibly();
+    if (this.getMsPause() > 0L) {
+      try {
+        latch.await(getMsPause(), TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        //Unpausing self as this is timed
+        unpause();
+      }
+    } else {
+      latch.awaitUninterruptibly();
+    }
   }
 
   public void interruptiblePause() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
index 36ccee3..bb7f737 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
@@ -138,6 +138,20 @@ public class Controls {
     }
 
     /**
+     * Adds a time-bound pause injection to the controls builder with the given parameters.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @param nSkip     number of times to skip before firing
+     * @param msPause     duration of the pause in millisec
+     * @return this builder
+     */
+    public Builder addTimedPause(final Class<?> siteClass, final String desc, final int nSkip, final long msPause) {
+      injections.add(ControlsInjectionUtil.createTimedPause(siteClass, desc, nSkip, msPause));
+      return this;
+    }
+
+    /**
      * Adds a pause injection to the controls builder with the given parameters. The pause is not skipped i.e. the pause
      * happens when execution reaches the site.
      *
@@ -164,6 +178,21 @@ public class Controls {
     }
 
     /**
+     * Adds a time-bound pause injection (for the specified drillbit) to the controls builder with the given parameters.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @param nSkip     number of times to skip before firing
+     * @param msPause     duration of the pause in millisec
+     * @return this builder
+     */
+    public Builder addTimedPauseOnBit(final Class<?> siteClass, final String desc,
+                                 final DrillbitEndpoint endpoint, final int nSkip, final long msPause) {
+      injections.add(ControlsInjectionUtil.createTimedPauseOnBit(siteClass, desc, nSkip, endpoint, msPause));
+      return this;
+    }
+
+    /**
      * Adds a pause injection (for the specified drillbit) to the controls builder with the given parameters. The pause
      * is not skipped i.e. the pause happens when execution reaches the site.
      *

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
index c4b725c..e3b0b0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -133,6 +133,18 @@ public class ControlsInjectionUtil {
   }
 
   /**
+   * Create a time-bound pause injection. Note this format is not directly accepted by the injection mechanism. Use the
+   * {@link Controls} to build exceptions.
+   */
+  public static String createTimedPause(final Class<?> siteClass, final String desc, final int nSkip, final long msPause) {
+    return "{ \"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + desc + "\","
+      + "\"nSkip\" : " + nSkip + ","
+      + "\"msPause\" : " + msPause + "}";
+  }
+
+  /**
    * Create a pause injection on a specific bit. Note this format is not directly accepted by the injection
    * mechanism. Use the {@link Controls} to build exceptions.
    */
@@ -147,6 +159,21 @@ public class ControlsInjectionUtil {
   }
 
   /**
+   * Create a pause injection on a specific bit. Note this format is not directly accepted by the injection
+   * mechanism. Use the {@link Controls} to build exceptions.
+   */
+  public static String createTimedPauseOnBit(final Class<?> siteClass, final String desc, final int nSkip,
+                                        final DrillbitEndpoint endpoint, final long msPause) {
+    return "{ \"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + desc + "\","
+      + "\"nSkip\" : " + nSkip + ","
+      + "\"msPause\" : " + msPause + ","
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\"}";
+  }
+
+  /**
    * Create a latch injection. Note this format is not directly accepted by the injection mechanism. Use the
    * {@link Controls} to build exceptions.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/3df11e19/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 2e8aa02..571a793 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -145,6 +145,34 @@ public class TestPauseInjection extends BaseTestQuery {
   }
 
   @Test
+  public void timedPauseInjected() {
+    final long pauseDuration = 2000L;
+    final long expectedDuration = pauseDuration;
+    final ExtendedLatch trigger = new ExtendedLatch(1);
+    final Pointer<Exception> ex = new Pointer<>();
+    final String controls = Controls.newBuilder()
+      .addTimedPause(DummyClass.class, DummyClass.PAUSES, 0, pauseDuration)
+      .build();
+
+    ControlsInjectionUtil.setControls(session, controls);
+
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
+    //We don't need a ResumingThread because this should automatically resume
+
+    // test that the pause happens
+    final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+    final long actualDuration = dummyClass.pauses();
+    assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+      expectedDuration <= actualDuration);
+    assertTrue("No exception should be thrown.", ex.value == null);
+    try {
+      queryContext.close();
+    } catch (final Exception e) {
+      fail("Failed to close query context: " + e);
+    }
+  }
+
+  @Test
   public void pauseOnSpecificBit() {
     final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
     final ZookeeperHelper zkHelper = new ZookeeperHelper();
@@ -215,4 +243,77 @@ public class TestPauseInjection extends BaseTestQuery {
       zkHelper.stopZookeeper();
     }
   }
+
+
+  @Test
+  public void timedPauseOnSpecificBit() {
+    final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+    final ZookeeperHelper zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
+
+    final long pauseDuration = 2000L;
+    final long expectedDuration = pauseDuration;
+
+    try {
+      // Creating two drillbits
+      final Drillbit drillbit1, drillbit2;
+      final DrillConfig drillConfig = zkHelper.getConfig();
+      try {
+        drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+        drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+      } catch (final DrillbitStartupException e) {
+        throw new RuntimeException("Failed to start two drillbits.", e);
+      }
+
+      final DrillbitContext drillbitContext1 = drillbit1.getContext();
+      final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+      final UserSession session = UserSession.Builder.newBuilder()
+        .withCredentials(UserCredentials.newBuilder()
+          .setUserName("foo")
+          .build())
+        .withUserProperties(UserProperties.getDefaultInstance())
+        .withOptionManager(drillbitContext1.getOptionManager())
+        .build();
+
+      final DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
+      final String controls = Controls.newBuilder()
+        .addTimedPauseOnBit(DummyClass.class, DummyClass.PAUSES, drillbitEndpoint1, 0, pauseDuration)
+        .build();
+
+      ControlsInjectionUtil.setControls(session, controls);
+      {
+        final ExtendedLatch trigger = new ExtendedLatch(1);
+        final Pointer<Exception> ex = new Pointer<>();
+        final QueryContext queryContext = new QueryContext(session, drillbitContext1, QueryId.getDefaultInstance());
+
+        // test that the pause happens
+        final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+        final long actualDuration = dummyClass.pauses();
+        assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration), expectedDuration <= actualDuration);
+        assertTrue("No exception should be thrown.", ex.value == null);
+        try {
+          queryContext.close();
+        } catch (final Exception e) {
+          fail("Failed to close query context: " + e);
+        }
+      }
+
+      {
+        final ExtendedLatch trigger = new ExtendedLatch(1);
+        final QueryContext queryContext = new QueryContext(session, drillbitContext2, QueryId.getDefaultInstance());
+
+        // if the resume did not happen, the test would hang
+        final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+        dummyClass.pauses();
+        try {
+          queryContext.close();
+        } catch (final Exception e) {
+          fail("Failed to close query context: " + e);
+        }
+      }
+    } finally {
+      zkHelper.stopZookeeper();
+    }
+  }
 }


[2/7] drill git commit: DRILL-5425: Support HTTP Kerberos auth using SPNEGO

Posted by ar...@apache.org.
DRILL-5425: Support HTTP Kerberos auth using SPNEGO

closes #1040


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/adee4614
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/adee4614
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/adee4614

Branch: refs/heads/master
Commit: adee46149734908ad20568951c683e49e88a67a3
Parents: e25c58f
Author: Sindhuri Rayavaram <sr...@mapr.com>
Authored: Mon Sep 11 16:56:22 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:43:10 2018 +0200

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   4 +-
 .../exec/server/rest/LogInLogOutResources.java  | 124 +++++--
 .../server/rest/ViewableWithPermissions.java    |  21 +-
 .../drill/exec/server/rest/WebServer.java       |  52 ++-
 .../exec/server/rest/WebServerConstants.java    |  45 +++
 .../server/rest/auth/AuthDynamicFeature.java    |   9 +-
 .../server/rest/auth/DrillErrorHandler.java     |  45 +++
 .../DrillHttpConstraintSecurityHandler.java     |  61 ++++
 .../auth/DrillHttpSecurityHandlerProvider.java  | 184 +++++++++++
 .../rest/auth/DrillSpnegoAuthenticator.java     | 192 +++++++++++
 .../rest/auth/DrillSpnegoLoginService.java      | 149 +++++++++
 .../server/rest/auth/DrillUserPrincipal.java    |   6 +-
 .../server/rest/auth/FormSecurityHanlder.java   |  49 +++
 .../exec/server/rest/auth/SpnegoConfig.java     | 112 +++++++
 .../server/rest/auth/SpnegoSecurityHandler.java |  36 ++
 .../src/main/resources/drill-module.conf        |   4 +-
 .../src/main/resources/rest/generic.ftl         |   2 +-
 .../src/main/resources/rest/mainLogin.ftl       |  32 ++
 .../drill/exec/rpc/data/TestBitBitKerberos.java |   3 +-
 .../drill/exec/rpc/security/KerberosHelper.java |   8 +-
 .../rpc/user/security/TestUserBitKerberos.java  |   2 +-
 .../security/TestUserBitKerberosEncryption.java |   2 +-
 .../spnego/TestDrillSpnegoAuthenticator.java    | 286 ++++++++++++++++
 .../rest/spnego/TestSpnegoAuthentication.java   | 326 +++++++++++++++++++
 .../server/rest/spnego/TestSpnegoConfig.java    | 167 ++++++++++
 .../org/apache/drill/exec/rpc/BasicServer.java  |   1 +
 26 files changed, 1850 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 52aa52d..5059b4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidat
 import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
-import org.apache.drill.exec.server.options.TypeValidators.MaxWidthValidator;
 import org.apache.drill.exec.server.options.TypeValidators.AdminUsersValidator;
 import org.apache.drill.exec.server.options.TypeValidators.AdminUserGroupsValidator;
 import org.apache.drill.exec.testing.ExecutionControls;
@@ -149,6 +148,9 @@ public final class ExecConstants {
   public static final String HTTP_KEYSTORE_PASSWORD = SSL_KEYSTORE_PASSWORD;
   public static final String HTTP_TRUSTSTORE_PATH = SSL_TRUSTSTORE_PATH;
   public static final String HTTP_TRUSTSTORE_PASSWORD = SSL_TRUSTSTORE_PASSWORD;
+  public static final String HTTP_AUTHENTICATION_MECHANISMS = "drill.exec.http.auth.mechanisms";
+  public static final String HTTP_SPNEGO_PRINCIPAL = "drill.exec.http.auth.spnego.principal";
+  public static final String HTTP_SPNEGO_KEYTAB = "drill.exec.http.auth.spnego.keytab";
   public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
   public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
   public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogInLogOutResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogInLogOutResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogInLogOutResources.java
index 20cd6da..34ac4d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogInLogOutResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogInLogOutResources.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,18 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.AuthStringUtil;
+import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
+import org.apache.drill.exec.work.WorkManager;
+import org.eclipse.jetty.security.authentication.FormAuthenticator;
+import org.eclipse.jetty.util.security.Constraint;
+import org.glassfish.jersey.server.mvc.Viewable;
+
 import javax.annotation.security.PermitAll;
+import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpSession;
@@ -31,33 +42,26 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.SecurityContext;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
-import org.eclipse.jetty.security.authentication.FormAuthenticator;
-import org.glassfish.jersey.server.mvc.Viewable;
-
 import java.net.URI;
 import java.net.URLDecoder;
+import java.util.Set;
 
-@Path("/")
+@Path(WebServerConstants.WEBSERVER_ROOT_PATH)
 @PermitAll
 public class LogInLogOutResources {
-  public static final String REDIRECT_QUERY_PARM = "redirect";
-  public static final String LOGIN_RESOURCE = "login";
 
-  @GET
-  @Path("/login")
-  @Produces(MediaType.TEXT_HTML)
-  public Viewable getLoginPage(@Context HttpServletRequest request, @Context HttpServletResponse response,
-      @Context SecurityContext sc, @Context UriInfo uriInfo, @QueryParam(REDIRECT_QUERY_PARM) String redirect)
-      throws Exception {
-    if (AuthDynamicFeature.isUserLoggedIn(sc)) {
-      // if the user is already login, forward the request to homepage.
-      request.getRequestDispatcher("/").forward(request, response);
-      return null;
-    }
+  @Inject
+  WorkManager workManager;
 
+
+  /**
+   * Update the destination URI to be redirect URI if specified in the request URL so that after the login is
+   * successful, request is forwarded to redirect page.
+   * @param redirect - Redirect parameter in the request URI
+   * @param request - Http Servlet Request
+   * @throws Exception
+   */
+  private void updateSessionRedirectInfo(String redirect, HttpServletRequest request) throws Exception {
     if (!StringUtils.isEmpty(redirect)) {
       // If the URL has redirect in it, set the redirect URI in session, so that after the login is successful, request
       // is forwarded to the redirect page.
@@ -65,27 +69,99 @@ public class LogInLogOutResources {
       final URI destURI = UriBuilder.fromUri(URLDecoder.decode(redirect, "UTF-8")).build();
       session.setAttribute(FormAuthenticator.__J_URI, destURI.toString());
     }
+  }
+
+  @GET
+  @Path(WebServerConstants.FORM_LOGIN_RESOURCE_PATH)
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getLoginPage(@Context HttpServletRequest request, @Context HttpServletResponse response,
+                               @Context SecurityContext sc, @Context UriInfo uriInfo,
+                               @QueryParam(WebServerConstants.REDIRECT_QUERY_PARM) String redirect) throws Exception {
+
+    if (AuthDynamicFeature.isUserLoggedIn(sc)) {
+      // if the user is already login, forward the request to homepage.
+      request.getRequestDispatcher(WebServerConstants.WEBSERVER_ROOT_PATH).forward(request, response);
+      return null;
+    }
 
+    updateSessionRedirectInfo(redirect, request);
     return ViewableWithPermissions.createLoginPage(null);
   }
 
+  @GET
+  @Path(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH)
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getSpnegoLogin(@Context HttpServletRequest request, @Context HttpServletResponse response,
+                                 @Context SecurityContext sc, @Context UriInfo uriInfo,
+                                 @QueryParam(WebServerConstants.REDIRECT_QUERY_PARM) String redirect) throws Exception {
+    if (AuthDynamicFeature.isUserLoggedIn(sc)) {
+      request.getRequestDispatcher(WebServerConstants.WEBSERVER_ROOT_PATH).forward(request, response);
+      return null;
+    }
+
+    final String errorString = "Invalid SPNEGO credentials or SPNEGO is not configured";
+    final DrillConfig drillConfig = workManager.getContext().getConfig();
+    MainLoginPageModel model = new MainLoginPageModel(errorString, drillConfig);
+    return ViewableWithPermissions.createMainLoginPage(model);
+  }
+
   // Request type is POST because POST request which contains the login credentials are invalid and the request is
   // dispatched here directly.
   @POST
-  @Path("/login")
+  @Path(WebServerConstants.FORM_LOGIN_RESOURCE_PATH)
   @Produces(MediaType.TEXT_HTML)
   public Viewable getLoginPageAfterValidationError() {
     return ViewableWithPermissions.createLoginPage("Invalid username/password credentials.");
   }
 
   @GET
-  @Path("/logout")
+  @Path(WebServerConstants.LOGOUT_RESOURCE_PATH)
   public void logout(@Context HttpServletRequest req, @Context HttpServletResponse resp) throws Exception {
     final HttpSession session = req.getSession();
     if (session != null) {
       session.invalidate();
     }
 
-    req.getRequestDispatcher("/").forward(req, resp);
+    req.getRequestDispatcher(WebServerConstants.WEBSERVER_ROOT_PATH).forward(req, resp);
+  }
+
+  @GET
+  @Path(WebServerConstants.MAIN_LOGIN_RESOURCE_PATH)
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getMainLoginPage(@Context HttpServletRequest request, @Context HttpServletResponse response,
+                                   @Context SecurityContext sc, @Context UriInfo uriInfo,
+                                   @QueryParam(WebServerConstants.REDIRECT_QUERY_PARM) String redirect) throws Exception {
+    updateSessionRedirectInfo(redirect, request);
+    final DrillConfig drillConfig = workManager.getContext().getConfig();
+    MainLoginPageModel model = new MainLoginPageModel(null, drillConfig);
+    return ViewableWithPermissions.createMainLoginPage(model);
+  }
+
+  private class MainLoginPageModel {
+
+    private final String error;
+
+    private final boolean authEnabled;
+
+    private final Set<String> configuredMechs;
+
+    MainLoginPageModel(String error, DrillConfig drillConfig) {
+      this.error = error;
+      authEnabled = drillConfig.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
+      configuredMechs = AuthStringUtil.asSet(
+          drillConfig.getStringList(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS));
+    }
+
+    public boolean isSpnegoEnabled() {
+      return authEnabled && configuredMechs.contains(Constraint.__SPNEGO_AUTH);
+    }
+
+    public boolean isFormEnabled() {
+      return authEnabled && configuredMechs.contains(Constraint.__FORM_AUTH);
+    }
+
+    public String getError() {
+      return error;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
index 73019aa..2eed0b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -47,8 +47,8 @@ public class ViewableWithPermissions extends Viewable {
    * @param model
    * @return
    */
-  public static Viewable create(final boolean authEnabled, final String templateName, final SecurityContext sc,
-      final Object model) {
+  public static Viewable create(final boolean authEnabled, final String templateName,
+                                final SecurityContext sc, final Object model) {
     return new ViewableWithPermissions(authEnabled, templateName, sc, true, model);
   }
 
@@ -61,13 +61,18 @@ public class ViewableWithPermissions extends Viewable {
     return new ViewableWithPermissions(true, "/rest/login.ftl", null, false, errorMsg);
   }
 
-  private ViewableWithPermissions(final boolean authEnabled, final String templateName, final SecurityContext sc,
-      final boolean showControls, final Object model) throws IllegalArgumentException {
+  public static Viewable createMainLoginPage(Object mainPageModel) {
+    return new ViewableWithPermissions(true, "/rest/mainLogin.ftl", null, false, mainPageModel);
+  }
+
+  private ViewableWithPermissions(final boolean authEnabled, final String templateName,
+                                  final SecurityContext sc, final boolean showControls,
+                                  final Object model) throws IllegalArgumentException {
     super(templateName, createModel(authEnabled, sc, showControls, model));
   }
 
   private static Map<String, Object> createModel(final boolean authEnabled, final SecurityContext sc,
-      final boolean showControls, final Object pageModel) {
+                                                 final boolean showControls, final Object pageModel) {
 
     final boolean isAdmin = !authEnabled /* when auth is disabled every user is an admin user */
         || (showControls && sc.isUserInRole(DrillUserPrincipal.ADMIN_ROLE));
@@ -82,8 +87,8 @@ public class ViewableWithPermissions extends Viewable {
         .put("showLogin", authEnabled && showControls && !isUserLoggedIn)
         .put("showLogout", authEnabled && showControls && isUserLoggedIn)
         .put("loggedInUserName", authEnabled && showControls &&
-            isUserLoggedIn ? sc.getUserPrincipal().getName() : DrillUserPrincipal.ANONYMOUS_USER)
-        .put("showControls", showControls);
+            isUserLoggedIn ? sc.getUserPrincipal().getName()
+                           : DrillUserPrincipal.ANONYMOUS_USER).put("showControls", showControls);
 
     if (pageModel != null) {
       mapBuilder.put("model", pageModel);

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index f0e822f..c702314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -28,10 +28,11 @@ import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ssl.SSLConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.rpc.security.plain.PlainFactory;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.rest.auth.DrillErrorHandler;
 import org.apache.drill.exec.server.rest.auth.DrillRestLoginService;
+import org.apache.drill.exec.server.rest.auth.DrillHttpSecurityHandlerProvider;
 import org.apache.drill.exec.ssl.SSLConfigBuilder;
 import org.apache.drill.exec.work.WorkManager;
 import org.bouncycastle.asn1.x500.X500NameBuilder;
@@ -114,7 +115,7 @@ public class WebServer implements AutoCloseable {
   /**
    * Create Jetty based web server.
    *
-   * @param context Bootstrap context.
+   * @param context     Bootstrap context.
    * @param workManager WorkManager instance.
    */
   public WebServer(final BootStrapContext context, final WorkManager workManager, final Drillbit drillbit) {
@@ -136,7 +137,8 @@ public class WebServer implements AutoCloseable {
    * @return true if impersonation without authentication is enabled, false otherwise
    */
   public static boolean isImpersonationOnlyEnabled(DrillConfig config) {
-    return !config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED) && config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+    return !config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)
+        && config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   /**
@@ -150,11 +152,6 @@ public class WebServer implements AutoCloseable {
     }
 
     final boolean authEnabled = config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
-    if (authEnabled && !context.getAuthProvider().containsFactory(PlainFactory.SIMPLE_NAME)) {
-      logger.warn("Not starting web server. Currently Drill supports web authentication only through " +
-          "username/password. But PLAIN mechanism is not configured.");
-      return;
-    }
 
     port = config.getInt(ExecConstants.HTTP_PORT);
     boolean portHunt = config.getBoolean(ExecConstants.HTTP_PORT_HUNT);
@@ -187,9 +184,10 @@ public class WebServer implements AutoCloseable {
     }
   }
 
-  private ServletContextHandler createServletContextHandler(final boolean authEnabled) {
+  private ServletContextHandler createServletContextHandler(final boolean authEnabled) throws DrillbitStartupException {
     // Add resources
-    final ErrorHandler errorHandler = new ErrorHandler();
+    final ErrorHandler errorHandler = new DrillErrorHandler();
+
     errorHandler.setShowStacks(true);
     errorHandler.setShowMessageInTitle(true);
 
@@ -197,7 +195,8 @@ public class WebServer implements AutoCloseable {
     servletContextHandler.setErrorHandler(errorHandler);
     servletContextHandler.setContextPath("/");
 
-    final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext(), drillbit)));
+    final ServletHolder servletHolder = new ServletHolder(new ServletContainer(
+        new DrillRestServer(workManager, servletContextHandler.getServletContext(), drillbit)));
     servletHolder.setInitOrder(1);
     servletContextHandler.addServlet(servletHolder, "/*");
 
@@ -207,16 +206,16 @@ public class WebServer implements AutoCloseable {
     final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
     // Get resource URL for Drill static assets, based on where Drill icon is located
     String drillIconResourcePath =
-      Resource.newClassPathResource(BASE_STATIC_PATH + DRILL_ICON_RESOURCE_RELATIVE_PATH).getURL().toString();
-    staticHolder.setInitParameter(
-      "resourceBase",
-      drillIconResourcePath.substring(0,  drillIconResourcePath.length() - DRILL_ICON_RESOURCE_RELATIVE_PATH.length()));
+        Resource.newClassPathResource(BASE_STATIC_PATH + DRILL_ICON_RESOURCE_RELATIVE_PATH).getURL().toString();
+    staticHolder.setInitParameter("resourceBase",
+        drillIconResourcePath.substring(0, drillIconResourcePath.length() - DRILL_ICON_RESOURCE_RELATIVE_PATH.length()));
     staticHolder.setInitParameter("dirAllowed", "false");
     staticHolder.setInitParameter("pathInfoOnly", "true");
     servletContextHandler.addServlet(staticHolder, "/static/*");
 
     if (authEnabled) {
-      servletContextHandler.setSecurityHandler(createSecurityHandler());
+      //DrillSecurityHandler is used to support SPNEGO and FORM authentication together
+      servletContextHandler.setSecurityHandler(new DrillHttpSecurityHandlerProvider(config, workManager.getContext()));
       servletContextHandler.setSessionHandler(createSessionHandler(servletContextHandler.getSecurityHandler()));
     }
 
@@ -229,13 +228,13 @@ public class WebServer implements AutoCloseable {
     if (config.getBoolean(ExecConstants.HTTP_CORS_ENABLED)) {
       FilterHolder holder = new FilterHolder(CrossOriginFilter.class);
       holder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM,
-        StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_ORIGINS), ","));
+          StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_ORIGINS), ","));
       holder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM,
-        StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_METHODS), ","));
+          StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_METHODS), ","));
       holder.setInitParameter(CrossOriginFilter.ALLOWED_HEADERS_PARAM,
-        StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_HEADERS), ","));
+          StringUtils.join(config.getStringList(ExecConstants.HTTP_CORS_ALLOWED_HEADERS), ","));
       holder.setInitParameter(CrossOriginFilter.ALLOW_CREDENTIALS_PARAM,
-        String.valueOf(config.getBoolean(ExecConstants.HTTP_CORS_CREDENTIALS)));
+          String.valueOf(config.getBoolean(ExecConstants.HTTP_CORS_CREDENTIALS)));
 
       for (String path : new String[]{"*.json", "/storage/*/enable/*", "/status*"}) {
         servletContextHandler.addFilter(holder, path, EnumSet.of(DispatcherType.REQUEST));
@@ -314,8 +313,7 @@ public class WebServer implements AutoCloseable {
     if (config.getBoolean(ExecConstants.HTTP_ENABLE_SSL)) {
       try {
         serverConnector = createHttpsConnector(port);
-      }
-      catch(DrillException e){
+      } catch (DrillException e) {
         throw new DrillbitStartupException(e.getMessage(), e);
       }
     } else {
@@ -366,11 +364,10 @@ public class WebServer implements AutoCloseable {
       final DateTime now = DateTime.now();
 
       // Create builder for certificate attributes
-      final X500NameBuilder nameBuilder =
-          new X500NameBuilder(BCStyle.INSTANCE)
-              .addRDN(BCStyle.OU, "Apache Drill (auth-generated)")
-              .addRDN(BCStyle.O, "Apache Software Foundation (auto-generated)")
-              .addRDN(BCStyle.CN, workManager.getContext().getEndpoint().getAddress());
+      final X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE)
+          .addRDN(BCStyle.OU, "Apache Drill (auth-generated)")
+          .addRDN(BCStyle.O, "Apache Software Foundation (auto-generated)")
+          .addRDN(BCStyle.CN, workManager.getContext().getEndpoint().getAddress());
 
       final Date notBefore = now.minusMinutes(1).toDate();
       final Date notAfter = now.plusYears(5).toDate();
@@ -422,6 +419,7 @@ public class WebServer implements AutoCloseable {
 
   /**
    * Create HTTP connector.
+   *
    * @return Initialized {@link ServerConnector} instance for HTTP connections.
    * @throws Exception
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServerConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServerConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServerConstants.java
new file mode 100644
index 0000000..5650d43
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServerConstants.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+/**
+ * Holds various constants used by WebServer components.
+ */
+public final class WebServerConstants {
+
+  private WebServerConstants() {}
+
+  public static final String REDIRECT_QUERY_PARM = "redirect";
+  public static final String WEBSERVER_ROOT_PATH = "/";
+
+  // Main Login page which help to choose between Form and Spnego authentication
+  public static final String MAIN_LOGIN_RESOURCE_NAME = "mainLogin";
+  public static final String MAIN_LOGIN_RESOURCE_PATH = WEBSERVER_ROOT_PATH + MAIN_LOGIN_RESOURCE_NAME;
+
+  // Login page for FORM authentication
+  public static final String FORM_LOGIN_RESOURCE_NAME = "login";
+  public static final String FORM_LOGIN_RESOURCE_PATH = WEBSERVER_ROOT_PATH + FORM_LOGIN_RESOURCE_NAME;
+
+  // Login page for SPNEGO authentication
+  public static final String SPENGO_LOGIN_RESOURCE_NAME = "spnegoLogin";
+  public static final String SPENGO_LOGIN_RESOURCE_PATH = WEBSERVER_ROOT_PATH + SPENGO_LOGIN_RESOURCE_NAME;
+
+  // Logout page
+  public static final String LOGOUT_RESOURCE_NAME = "logout";
+  public static final String LOGOUT_RESOURCE_PATH = WEBSERVER_ROOT_PATH + LOGOUT_RESOURCE_NAME;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AuthDynamicFeature.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AuthDynamicFeature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AuthDynamicFeature.java
index bee0c9d..7ca739b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AuthDynamicFeature.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AuthDynamicFeature.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.server.rest.auth;
 
-import org.apache.drill.exec.server.rest.LogInLogOutResources;
+import org.apache.drill.exec.server.rest.WebServerConstants;
 import org.glassfish.jersey.server.model.AnnotatedMethod;
 
 import javax.annotation.Priority;
@@ -55,6 +55,9 @@ public class AuthDynamicFeature implements DynamicFeature {
     }
 
     // PermitAll takes precedence over RolesAllowed on the class
+    // This avoids putting AuthCheckFilter in the request flow for all path's which
+    // are defined under PermitAll annotation. That is requests for "/", "/login", "/mainLogin" and "/spnegoLogin"
+    // path's doesn't go through AuthCheckFilter.
     if (am.isAnnotationPresent(PermitAll.class)) {
       // Do nothing.
       return;
@@ -79,8 +82,8 @@ public class AuthDynamicFeature implements DynamicFeature {
           final String destResource =
               URLEncoder.encode(requestContext.getUriInfo().getRequestUri().toString(), "UTF-8");
           final URI loginURI = requestContext.getUriInfo().getBaseUriBuilder()
-              .path(LogInLogOutResources.LOGIN_RESOURCE)
-              .queryParam(LogInLogOutResources.REDIRECT_QUERY_PARM, destResource)
+              .path(WebServerConstants.MAIN_LOGIN_RESOURCE_NAME)
+              .queryParam(WebServerConstants.REDIRECT_QUERY_PARM, destResource)
               .build();
           requestContext.abortWith(Response.temporaryRedirect(loginURI).build()
           );

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillErrorHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillErrorHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillErrorHandler.java
new file mode 100644
index 0000000..df4825f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillErrorHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.auth;
+
+import org.apache.drill.exec.server.rest.WebServerConstants;
+import org.eclipse.jetty.server.handler.ErrorHandler;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.io.Writer;
+
+/**
+ * Custom ErrorHandler class for Drill's WebServer to have better error message in case when SPNEGO login failed and
+ * what to do next. In all other cases this would use the generic error page.
+ */
+public class DrillErrorHandler extends ErrorHandler {
+
+  @Override
+  protected void writeErrorPageMessage(HttpServletRequest request, Writer writer,
+                                       int code, String message, String uri) throws IOException {
+
+    super.writeErrorPageMessage(request, writer, code, message, uri);
+
+    if (uri.equals(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH)) {
+      writer.write("<p>SPNEGO Login Failed</p>");
+      writer.write("<p>Please check the requirements or use below link to use Form Authentication instead</p>");
+      writer.write("<a href='/login'> login </a>");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpConstraintSecurityHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpConstraintSecurityHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpConstraintSecurityHandler.java
new file mode 100644
index 0000000..0b095fb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpConstraintSecurityHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.auth;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.eclipse.jetty.security.ConstraintMapping;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.LoginService;
+import org.eclipse.jetty.security.authentication.LoginAuthenticator;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.ADMIN_ROLE;
+import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.AUTHENTICATED_ROLE;
+
+/**
+ * Accessor class that extends the ConstraintSecurityHandler to expose protected method's for start and stop of Handler.
+ * This is needed since now {@link DrillHttpSecurityHandlerProvider} composes of 2 security handlers -
+ * For FORM and SPNEGO and has responsibility to start/stop of those handlers.
+ **/
+public abstract class DrillHttpConstraintSecurityHandler extends ConstraintSecurityHandler {
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        super.doStop();
+    }
+
+    public abstract void doSetup(DrillbitContext dbContext) throws DrillException;
+
+    public void setup(LoginAuthenticator authenticator, LoginService loginService) {
+      final Set<String> knownRoles = ImmutableSet.of(AUTHENTICATED_ROLE, ADMIN_ROLE);
+      setConstraintMappings(Collections.<ConstraintMapping>emptyList(), knownRoles);
+      setAuthenticator(authenticator);
+      setLoginService(loginService);
+    }
+
+    public abstract String getImplName();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpSecurityHandlerProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpSecurityHandlerProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpSecurityHandlerProvider.java
new file mode 100644
index 0000000..3d77596
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillHttpSecurityHandlerProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.server.rest.auth;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.security.AuthStringUtil;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.rest.WebServerConstants;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.authentication.SessionAuthentication;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.util.security.Constraint;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class DrillHttpSecurityHandlerProvider extends ConstraintSecurityHandler {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHttpSecurityHandlerProvider.class);
+
+  private final Map<String, DrillHttpConstraintSecurityHandler> securityHandlers =
+      CaseInsensitiveMap.newHashMapWithExpectedSize(2);
+
+  public DrillHttpSecurityHandlerProvider(DrillConfig config, DrillbitContext drillContext)
+      throws DrillbitStartupException {
+
+    Preconditions.checkState(config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED));
+    final Set<String> configuredMechanisms = new HashSet<>();
+
+    if (config.hasPath(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS)) {
+      configuredMechanisms.addAll(AuthStringUtil.asSet(config.getStringList(ExecConstants.HTTP_AUTHENTICATION_MECHANISMS)));
+    } else { // for backward compatibility
+      configuredMechanisms.add(Constraint.__FORM_AUTH);
+    }
+
+      final ScanResult scan = drillContext.getClasspathScan();
+      final Collection<Class<? extends DrillHttpConstraintSecurityHandler>> factoryImpls =
+          scan.getImplementations(DrillHttpConstraintSecurityHandler.class);
+      logger.debug("Found DrillHttpConstraintSecurityHandler implementations: {}", factoryImpls);
+      for (final Class<? extends DrillHttpConstraintSecurityHandler> clazz : factoryImpls) {
+
+        // If all the configured mechanisms handler is added then break out of this loop
+        if (configuredMechanisms.isEmpty()) {
+          break;
+        }
+
+        Constructor<? extends DrillHttpConstraintSecurityHandler> validConstructor = null;
+        for (final Constructor<?> c : clazz.getConstructors()) {
+          final Class<?>[] params = c.getParameterTypes();
+          if (params.length == 0) {
+            validConstructor = (Constructor<? extends DrillHttpConstraintSecurityHandler>) c; // unchecked
+            break;
+          }
+        }
+
+        if (validConstructor == null) {
+          logger.warn("Skipping DrillHttpConstraintSecurityHandler class {}. It must implement at least one" +
+              " constructor with signature [{}()]", clazz.getCanonicalName(), clazz.getName());
+          continue;
+        }
+
+        try {
+          final DrillHttpConstraintSecurityHandler instance = validConstructor.newInstance();
+          if (configuredMechanisms.remove(instance.getImplName())) {
+            instance.doSetup(drillContext);
+            securityHandlers.put(instance.getImplName(), instance);
+          }
+        } catch (IllegalArgumentException | ReflectiveOperationException | DrillException e) {
+          logger.warn(String.format("Failed to create DrillHttpConstraintSecurityHandler of type '%s'",
+              clazz.getCanonicalName()), e);
+        }
+      }
+
+    if (securityHandlers.size() == 0) {
+      throw new DrillbitStartupException("Authentication is enabled for WebServer but none of the security mechanism " +
+          "was configured properly. Please verify the configurations and try again.");
+    }
+
+    logger.info("Configure auth mechanisms for WebServer are: {}", securityHandlers.keySet());
+  }
+
+  @Override
+  public void doStart() throws Exception {
+    super.doStart();
+    for (DrillHttpConstraintSecurityHandler securityHandler : securityHandlers.values()) {
+      securityHandler.doStart();
+    }
+  }
+
+  @Override
+  public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+      throws IOException, ServletException {
+
+    Preconditions.checkState(securityHandlers.size() > 0);
+
+    HttpSession session = request.getSession(true);
+    SessionAuthentication authentication =
+        (SessionAuthentication) session.getAttribute(SessionAuthentication.__J_AUTHENTICATED);
+    String uri = request.getRequestURI();
+    final DrillHttpConstraintSecurityHandler securityHandler;
+
+    // Before authentication, all requests go through the FormAuthenticator if configured except for /spnegoLogin
+    // request. For SPNEGO authentication all requests will be forced going via /spnegoLogin before authentication is
+    // done, this is to ensure that we don't have to authenticate same client session multiple times for each resource.
+    //
+    // If this authentication is null, user hasn't logged in yet
+    if (authentication == null) {
+
+      // 1) If only SPNEGOSecurity handler then use SPNEGOSecurity
+      // 2) If both but uri equals spnegoLogin then use SPNEGOSecurity
+      // 3) If both but uri doesn't equals spnegoLogin then use FORMSecurity
+      // 4) If only FORMSecurity handler then use FORMSecurity
+      if (isSpnegoEnabled() && (!isFormEnabled() || uri.equals(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH))) {
+        securityHandler = securityHandlers.get(Constraint.__SPNEGO_AUTH);
+        securityHandler.handle(target, baseRequest, request, response);
+      } else if (isFormEnabled()) {
+        securityHandler = securityHandlers.get(Constraint.__FORM_AUTH);
+        securityHandler.handle(target, baseRequest, request, response);
+      }
+    }
+    // If user has logged in, use the corresponding handler to handle the request
+    else {
+      final String authMethod = authentication.getAuthMethod();
+      securityHandler = securityHandlers.get(authMethod);
+      securityHandler.handle(target, baseRequest, request, response);
+    }
+  }
+
+  @Override
+  public void setHandler(Handler handler) {
+    super.setHandler(handler);
+    for (DrillHttpConstraintSecurityHandler securityHandler : securityHandlers.values()) {
+      securityHandler.setHandler(handler);
+    }
+  }
+
+  public void doStop() throws Exception {
+    super.doStop();
+    for (DrillHttpConstraintSecurityHandler securityHandler : securityHandlers.values()) {
+      securityHandler.doStop();
+    }
+  }
+
+  public boolean isSpnegoEnabled() {
+    return securityHandlers.containsKey(Constraint.__SPNEGO_AUTH);
+  }
+
+  public boolean isFormEnabled() {
+    return securityHandlers.containsKey(Constraint.__FORM_AUTH);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoAuthenticator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoAuthenticator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoAuthenticator.java
new file mode 100644
index 0000000..10f21ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoAuthenticator.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.server.rest.auth;
+
+
+import org.apache.drill.exec.server.rest.WebServerConstants;
+import org.apache.parquet.Strings;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.security.ServerAuthException;
+import org.eclipse.jetty.security.UserAuthentication;
+import org.eclipse.jetty.security.authentication.DeferredAuthentication;
+import org.eclipse.jetty.security.authentication.SessionAuthentication;
+import org.eclipse.jetty.security.authentication.SpnegoAuthenticator;
+import org.eclipse.jetty.server.Authentication;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.server.UserIdentity;
+
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.io.IOException;
+
+/**
+ * Custom SpnegoAuthenticator for Drill
+ */
+public class DrillSpnegoAuthenticator extends SpnegoAuthenticator {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSpnegoAuthenticator.class);
+
+  public DrillSpnegoAuthenticator(String authMethod) {
+    super(authMethod);
+  }
+
+  /**
+   * Updated logic as compared to default implementation in
+   * {@link SpnegoAuthenticator#validateRequest(ServletRequest, ServletResponse, boolean)} to handle below cases:
+   * 1) Perform SPNEGO authentication only when spnegoLogin resource is requested. This helps to avoid authentication
+   *    for each and every resource which the JETTY provided authenticator does.
+   * 2) Helps to redirect to the target URL after authentication is done successfully.
+   * 3) Clear-Up in memory session information once LogOut is triggered such that any future request also triggers SPNEGO
+   *    authentication.
+   * @param request
+   * @param response
+   * @param mandatoryAuth
+   * @return
+   * @throws ServerAuthException
+   */
+  @Override
+  public Authentication validateRequest(ServletRequest request, ServletResponse response, boolean mandatoryAuth)
+      throws ServerAuthException {
+
+    final HttpServletRequest req = (HttpServletRequest) request;
+    final HttpSession session = req.getSession(true);
+    final Authentication authentication = (Authentication) session.getAttribute(SessionAuthentication.__J_AUTHENTICATED);
+    final String uri = req.getRequestURI();
+
+    // If the Request URI is for /spnegoLogin then perform login
+    final boolean mandatory = mandatoryAuth || uri.equals(WebServerConstants.SPENGO_LOGIN_RESOURCE_PATH);
+
+    // For logout remove the attribute from the session that holds UserIdentity
+    if (authentication != null) {
+      if (uri.equals(WebServerConstants.LOGOUT_RESOURCE_PATH)) {
+        logger.debug("Logging out user {}", req.getRemoteAddr());
+        session.removeAttribute(SessionAuthentication.__J_AUTHENTICATED);
+        return null;
+      }
+
+      // Already logged in so just return the session attribute.
+      return authentication;
+    }
+
+    // Try to authenticate an unauthenticated session.
+    return authenticateSession(request, response, mandatory);
+  }
+
+  /**
+   * Method to authenticate a user session using the SPNEGO token passed in AUTHORIZATION header of request.
+   * @param request
+   * @param response
+   * @param mandatory
+   * @return
+   * @throws ServerAuthException
+   */
+  private Authentication authenticateSession(ServletRequest request, ServletResponse response, boolean mandatory)
+      throws ServerAuthException {
+
+    final HttpServletRequest req = (HttpServletRequest) request;
+    final HttpServletResponse res = (HttpServletResponse) response;
+    final HttpSession session = req.getSession(true);
+
+    // Defer the authentication if not mandatory.
+    if (!mandatory) {
+      return new DeferredAuthentication(this);
+    }
+
+    // Authentication is mandatory, get the Authorization header
+    final String header = req.getHeader(HttpHeader.AUTHORIZATION.asString());
+
+    // Authorization header is null, so send the 401 error code to client along with negotiate header
+    if (header == null) {
+      try {
+        if (DeferredAuthentication.isDeferred(res)) {
+          return Authentication.UNAUTHENTICATED;
+        } else {
+          res.setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), HttpHeader.NEGOTIATE.asString());
+          res.sendError(401);
+          logger.debug("DrillSpnegoAuthenticator: Sending challenge to client {}", req.getRemoteAddr());
+          return Authentication.SEND_CONTINUE;
+        }
+      } catch (IOException e) {
+        logger.error("DrillSpnegoAuthenticator: Failed while sending challenge to client {}", req.getRemoteAddr(), e);
+        throw new ServerAuthException(e);
+      }
+    }
+
+    // Valid Authorization header received. Get the SPNEGO token sent by client and try to authenticate
+    logger.debug("DrillSpnegoAuthenticator: Received NEGOTIATE Response back from client {}", req.getRemoteAddr());
+    final String negotiateString = HttpHeader.NEGOTIATE.asString();
+
+    if (header.startsWith(negotiateString)) {
+      final String spnegoToken = header.substring(negotiateString.length() + 1);
+      final UserIdentity user = this.login(null, spnegoToken, request);
+
+      //redirect the request to the desired page after successful login
+      if (user != null) {
+        String newUri = (String) session.getAttribute("org.eclipse.jetty.security.form_URI");
+        if (Strings.isNullOrEmpty(newUri)) {
+          newUri = req.getContextPath();
+          if (Strings.isNullOrEmpty(newUri)) {
+            newUri = WebServerConstants.WEBSERVER_ROOT_PATH;
+          }
+        }
+
+        response.setContentLength(0);
+        final HttpChannel channel = HttpChannel.getCurrentHttpChannel();
+        final Response base_response = channel.getResponse();
+        final Request base_request = channel.getRequest();
+        final int redirectCode =
+            base_request.getHttpVersion().getVersion() < HttpVersion.HTTP_1_1.getVersion() ? 302 : 303;
+        try {
+          base_response.sendRedirect(redirectCode, res.encodeRedirectURL(newUri));
+        } catch (IOException e) {
+          logger.error("DrillSpnegoAuthenticator: Failed while using the redirect URL {} from client {}", newUri,
+              req.getRemoteAddr(), e);
+          throw new ServerAuthException(e);
+        }
+
+        logger.debug("DrillSpnegoAuthenticator: Successfully authenticated this client session: {}",
+            user.getUserPrincipal().getName());
+        return new UserAuthentication(this.getAuthMethod(), user);
+      }
+    }
+
+    logger.debug("DrillSpnegoAuthenticator: Authentication failed for client session: {}", req.getRemoteAddr());
+    return Authentication.UNAUTHENTICATED;
+
+  }
+
+  public UserIdentity login(String username, Object password, ServletRequest request) {
+    final UserIdentity user = super.login(username, password, request);
+
+    if (user != null) {
+      final HttpSession session = ((HttpServletRequest) request).getSession(true);
+      final Authentication cached = new SessionAuthentication(this.getAuthMethod(), user, password);
+      session.setAttribute(SessionAuthentication.__J_AUTHENTICATED, cached);
+    }
+
+    return user;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoLoginService.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoLoginService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoLoginService.java
new file mode 100644
index 0000000..e7fbc16
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillSpnegoLoginService.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.server.rest.auth;
+
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.eclipse.jetty.security.DefaultIdentityService;
+import org.eclipse.jetty.security.SpnegoLoginService;
+import org.eclipse.jetty.server.UserIdentity;
+import org.eclipse.jetty.util.B64Code;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+import javax.security.auth.Subject;
+import java.lang.reflect.Field;
+import java.security.Principal;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Custom implementation of DrillSpnegoLoginService to avoid the need of passing targetName in a config file,
+ * to include the SPNEGO OID and the way UserIdentity is created.
+ */
+public class DrillSpnegoLoginService extends SpnegoLoginService {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSpnegoLoginService.class);
+
+  private static final String TARGET_NAME_FIELD_NAME = "_targetName";
+
+  private final DrillbitContext drillContext;
+
+  private final SpnegoConfig spnegoConfig;
+
+  private final UserGroupInformation loggedInUgi;
+
+  public DrillSpnegoLoginService(DrillbitContext drillBitContext) throws DrillException {
+    super(DrillSpnegoLoginService.class.getName());
+    setIdentityService(new DefaultIdentityService());
+    drillContext = drillBitContext;
+
+    // Load and verify SPNEGO config. Then Login using creds to get an UGI instance
+    spnegoConfig = new SpnegoConfig(drillBitContext.getConfig());
+    spnegoConfig.validateSpnegoConfig();
+    loggedInUgi = spnegoConfig.getLoggedInUgi();
+  }
+
+  @Override
+  protected void doStart() throws Exception {
+    // Override the parent implementation, setting _targetName to be the serverPrincipal
+    // without the need for a one-line file to do the same thing.
+    final Field targetNameField = SpnegoLoginService.class.getDeclaredField(TARGET_NAME_FIELD_NAME);
+    targetNameField.setAccessible(true);
+    targetNameField.set(this, spnegoConfig.getSpnegoPrincipal());
+  }
+
+  @Override
+  public UserIdentity login(final String username, final Object credentials) {
+
+    UserIdentity identity = null;
+    try {
+      identity = loggedInUgi.doAs(new PrivilegedExceptionAction<UserIdentity>() {
+        @Override
+        public UserIdentity run() {
+          return spnegoLogin(credentials);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Failed to login using SPNEGO", e);
+    }
+
+    return identity;
+  }
+
+  private UserIdentity spnegoLogin(Object credentials) {
+
+    String encodedAuthToken = (String) credentials;
+    byte[] authToken = B64Code.decode(encodedAuthToken);
+    GSSManager manager = GSSManager.getInstance();
+
+    try {
+      // Providing both OID's is required here. If we provide only one,
+      // we're requiring that clients provide us the SPNEGO OID to authenticate via Kerberos.
+      Oid[] knownOids = new Oid[2];
+      knownOids[0] = new Oid("1.3.6.1.5.5.2"); // spnego
+      knownOids[1] = new Oid("1.2.840.113554.1.2.2"); // kerberos
+
+      GSSName gssName = manager.createName(spnegoConfig.getSpnegoPrincipal(), null);
+      GSSCredential serverCreds = manager.createCredential(gssName, GSSCredential.INDEFINITE_LIFETIME,
+          knownOids, GSSCredential.ACCEPT_ONLY);
+      GSSContext gContext = manager.createContext(serverCreds);
+
+      if (gContext == null) {
+        logger.debug("SPNEGOUserRealm: failed to establish GSSContext");
+      } else {
+        while (!gContext.isEstablished()) {
+          authToken = gContext.acceptSecContext(authToken, 0, authToken.length);
+        }
+
+        if (gContext.isEstablished()) {
+          String clientName = gContext.getSrcName().toString();
+          String role = clientName.substring(clientName.indexOf(64) + 1);
+
+          final SystemOptionManager sysOptions = drillContext.getOptionManager();
+          final boolean isAdmin = ImpersonationUtil.hasAdminPrivileges(role,
+              ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(sysOptions),
+              ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(sysOptions));
+
+          final Principal user = new DrillUserPrincipal(clientName, isAdmin);
+          final Subject subject = new Subject();
+          subject.getPrincipals().add(user);
+
+          if (isAdmin) {
+            return this._identityService.newUserIdentity(subject, user, DrillUserPrincipal.ADMIN_USER_ROLES);
+          } else {
+            return this._identityService.newUserIdentity(subject, user, DrillUserPrincipal.NON_ADMIN_USER_ROLES);
+          }
+        }
+      }
+    } catch (GSSException gsse) {
+      logger.warn("Caught GSSException trying to authenticate the client", gsse);
+    }
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
index a21977f..01206c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java
@@ -38,9 +38,11 @@ public class DrillUserPrincipal implements Principal {
 
   public static final String[] NON_ADMIN_USER_ROLES = new String[]{AUTHENTICATED_ROLE};
 
-  public static final List<RolePrincipal> ADMIN_PRINCIPALS = ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE), new RolePrincipal(ADMIN_ROLE));
+  public static final List<RolePrincipal> ADMIN_PRINCIPALS =
+      ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE), new RolePrincipal(ADMIN_ROLE));
 
-  public static final List<RolePrincipal> NON_ADMIN_PRINCIPALS = ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE));
+  public static final List<RolePrincipal> NON_ADMIN_PRINCIPALS =
+      ImmutableList.of(new RolePrincipal(AUTHENTICATED_ROLE));
 
   private final String userName;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/FormSecurityHanlder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/FormSecurityHanlder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/FormSecurityHanlder.java
new file mode 100644
index 0000000..31d7cec
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/FormSecurityHanlder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.auth;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.rest.WebServerConstants;
+import org.eclipse.jetty.security.authentication.FormAuthenticator;
+import org.eclipse.jetty.util.security.Constraint;
+
+public class FormSecurityHanlder extends DrillHttpConstraintSecurityHandler {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormSecurityHanlder.class);
+
+  @Override
+  public String getImplName() {
+    return Constraint.__FORM_AUTH;
+  }
+
+  @Override
+  public void doSetup(DrillbitContext dbContext) throws DrillException {
+
+    // Check if PAMAuthenticator is available or not which is required for FORM authentication
+    if (!dbContext.getAuthProvider().containsFactory(PlainFactory.SIMPLE_NAME)) {
+      throw new DrillException("FORM mechanism was configured but PLAIN mechanism is not enabled to provide an " +
+          "authenticator. Please configure user authentication with PLAIN mechanism and authenticator to use " +
+          "FORM authentication");
+    }
+
+    setup(new FormAuthenticator(WebServerConstants.FORM_LOGIN_RESOURCE_PATH,
+        WebServerConstants.FORM_LOGIN_RESOURCE_PATH, true), new DrillRestLoginService(dbContext));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
new file mode 100644
index 0000000..a64d7de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.server.rest.auth;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class SpnegoConfig {
+
+  private UserGroupInformation loggedInUgi;
+
+  private final String principal;
+
+  private final String keytab;
+
+  public SpnegoConfig(DrillConfig config) {
+
+    keytab = config.hasPath(ExecConstants.HTTP_SPNEGO_KEYTAB) ?
+        config.getString(ExecConstants.HTTP_SPNEGO_KEYTAB) :
+        null;
+
+    principal = config.hasPath(ExecConstants.HTTP_SPNEGO_PRINCIPAL) ?
+        config.getString(ExecConstants.HTTP_SPNEGO_PRINCIPAL) :
+        null;
+  }
+
+  //Reads the SPNEGO principal from the config file
+  public String getSpnegoPrincipal() {
+    return principal;
+  }
+
+  public void validateSpnegoConfig() throws DrillException {
+
+    StringBuilder errorMsg = new StringBuilder();
+
+    if (principal != null && keytab != null) {
+      return;
+    }
+
+    if (principal == null) {
+      errorMsg.append("\nConfiguration ");
+      errorMsg.append(ExecConstants.HTTP_SPNEGO_PRINCIPAL);
+      errorMsg.append(" is not found");
+    }
+
+    if (keytab == null) {
+      errorMsg.append("\nConfiguration ");
+      errorMsg.append(ExecConstants.HTTP_SPNEGO_KEYTAB);
+      errorMsg.append(" is not found");
+    }
+
+    throw new DrillException(errorMsg.toString());
+  }
+
+  public UserGroupInformation getLoggedInUgi() throws DrillException {
+
+    if (loggedInUgi != null) {
+      return loggedInUgi;
+    }
+    loggedInUgi = loginAndReturnUgi();
+    return loggedInUgi;
+  }
+
+  //Performs the Server login to KDC for SPNEGO
+  private UserGroupInformation loginAndReturnUgi() throws DrillException {
+
+    validateSpnegoConfig();
+
+    UserGroupInformation ugi;
+    try {
+      // Check if security is not enabled and try to set the security parameter to login the principal.
+      // After the login is performed reset the static UGI state.
+      if (!UserGroupInformation.isSecurityEnabled()) {
+        final Configuration newConfig = new Configuration();
+        newConfig.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+            UserGroupInformation.AuthenticationMethod.KERBEROS.toString());
+
+        UserGroupInformation.setConfiguration(newConfig);
+        ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+
+        // Reset the original configuration for static UGI
+        UserGroupInformation.setConfiguration(new Configuration());
+      } else {
+        ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+      }
+    } catch (Exception e) {
+      throw new DrillException(String.format("Login failed for %s with given keytab", principal), e);
+    }
+    return ugi;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoSecurityHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoSecurityHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoSecurityHandler.java
new file mode 100644
index 0000000..9e6acb1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoSecurityHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.auth;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.eclipse.jetty.util.security.Constraint;
+
+public class SpnegoSecurityHandler extends DrillHttpConstraintSecurityHandler {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpnegoSecurityHandler.class);
+
+  @Override
+  public String getImplName() {
+    return Constraint.__SPNEGO_AUTH;
+  }
+
+  @Override
+  public void doSetup(DrillbitContext dbContext) throws DrillException {
+    setup(new DrillSpnegoAuthenticator(getImplName()), new DrillSpnegoLoginService(dbContext));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index c923e4f..a0cf643 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -27,6 +27,7 @@ drill {
       org.apache.drill.exec.physical.impl.RootCreator,
       org.apache.drill.exec.rpc.user.security.UserAuthenticator,
       org.apache.drill.exec.rpc.security.AuthenticatorFactory,
+      org.apache.drill.exec.server.rest.auth.DrillHttpConstraintSecurityHandler,
       org.apache.drill.exec.store.dfs.FormatPlugin,
       org.apache.drill.exec.store.StoragePlugin
     ],
@@ -38,7 +39,8 @@ drill {
           org.apache.drill.exec.physical,
           org.apache.drill.exec.store,
           org.apache.drill.exec.rpc.user.security,
-          org.apache.drill.exec.rpc.security
+          org.apache.drill.exec.rpc.security,
+          org.apache.drill.exec.server.rest.auth
     ]
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/resources/rest/generic.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/generic.ftl b/exec/java-exec/src/main/resources/rest/generic.ftl
index 9025adb..d04414f 100644
--- a/exec/java-exec/src/main/resources/rest/generic.ftl
+++ b/exec/java-exec/src/main/resources/rest/generic.ftl
@@ -77,7 +77,7 @@
               </#if>
               <li><a href="http://drill.apache.org/docs/">Documentation</a>
               <#if showLogin == true >
-              <li><a href="/login">Log In</a>
+              <li><a href="/mainLogin">Log In</a>
               </#if>
               <#if showLogout == true >
               <li><a href="/logout">Log Out (${loggedInUserName})</a>

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/main/resources/rest/mainLogin.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/mainLogin.ftl b/exec/java-exec/src/main/resources/rest/mainLogin.ftl
new file mode 100644
index 0000000..8deb156
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/mainLogin.ftl
@@ -0,0 +1,32 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+        <div class="page-header">
+        </div>
+        <div class="container container-table">
+        <div align="center" class="table-responsive">
+        <#if model?? && model.isFormEnabled()>
+        <a href ="/login" class="btn btn-primary"> Login using FORM AUTHENTICATION </a>
+        </#if>
+        <#if model?? && model.isSpnegoEnabled()>
+        <a href = "/spnegoLogin" class="btn btn-primary"> Login using SPNEGO </a>
+        </#if>
+        <#if model?? && model.getError()??>
+        <p style="color:red">${model.getError()}</p></br>
+        </#if>
+        </div>
+        </div>
+</#macro>
+<@page_html/>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index 81b027f..a24b0db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -95,7 +95,8 @@ public class TestBitBitKerberos extends BaseTestQuery {
   public static void setupTest() throws Exception {
 
     final Config config = DrillConfig.create(cloneDefaultTestConfigProperties());
-    krbHelper = new KerberosHelper(TestBitBitKerberos.class.getSimpleName());
+
+    krbHelper = new KerberosHelper(TestBitBitKerberos.class.getSimpleName(), null);
     krbHelper.setupKdc(dirTestWatcher.getTmpDir());
 
     newConfig = new DrillConfig(

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
index 451e0aa..8ba4d18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
@@ -49,10 +49,14 @@ public class KerberosHelper {
 
   private boolean kdcStarted;
 
-  public KerberosHelper(final String testName) {
+  public KerberosHelper(final String testName, String serverShortName) {
     final String realm = "EXAMPLE.COM";
     CLIENT_PRINCIPAL = CLIENT_SHORT_NAME + "@" + realm;
-    final String serverShortName = System.getProperty("user.name");
+
+    if (serverShortName == null) {
+      serverShortName = System.getProperty("user.name");
+    }
+
     SERVER_PRINCIPAL = serverShortName + "/" + HOSTNAME + "@" + realm;
     this.testName = testName;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index d6495e9..065746c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -56,7 +56,7 @@ public class TestUserBitKerberos extends BaseTestQuery {
   @BeforeClass
   public static void setupTest() throws Exception {
 
-    krbHelper = new KerberosHelper(TestUserBitKerberos.class.getSimpleName());
+    krbHelper = new KerberosHelper(TestUserBitKerberos.class.getSimpleName(), null);
     krbHelper.setupKdc(dirTestWatcher.getTmpDir());
 
     // Create a new DrillConfig which has user authentication enabled and authenticator set to

http://git-wip-us.apache.org/repos/asf/drill/blob/adee4614/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index 4f411ae..ac60880 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -60,7 +60,7 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
 
   @BeforeClass
   public static void setupTest() throws Exception {
-    krbHelper = new KerberosHelper(TestUserBitKerberosEncryption.class.getSimpleName());
+    krbHelper = new KerberosHelper(TestUserBitKerberosEncryption.class.getSimpleName(), null);
     krbHelper.setupKdc(dirTestWatcher.getTmpDir());
 
     // Create a new DrillConfig which has user authentication enabled and authenticator set to


[6/7] drill git commit: DRILL-5994: Added webserver maxThreads configuration option to enable launching on a machine with more than 200 cores

Posted by ar...@apache.org.
DRILL-5994: Added webserver maxThreads configuration option to enable launching on a machine with more than 200 cores

closes #1069


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ce80da85
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ce80da85
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ce80da85

Branch: refs/heads/master
Commit: ce80da857d1b28af7619f8402ffe1e4e3c833e1c
Parents: 3df11e1
Author: mitchel <mi...@hotmail.com>
Authored: Tue Dec 12 10:27:19 2017 -0500
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:53:57 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/ExecConstants.java       | 1 +
 .../main/java/org/apache/drill/exec/server/rest/WebServer.java   | 3 ++-
 exec/java-exec/src/main/resources/drill-module.conf              | 4 +++-
 3 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ce80da85/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5059b4f..d155466 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -167,6 +167,7 @@ public final class ExecConstants {
   public static final String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
   public static final String USER_ENCRYPTION_SASL_ENABLED = "drill.exec.security.user.encryption.sasl.enabled";
   public static final String USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.user.encryption.sasl.max_wrapped_size";
+  public static final String WEB_SERVER_THREAD_POOL_MAX = "drill.exec.web_server.thread_pool_max";
 
   public static final String USER_SSL_ENABLED = "drill.exec.security.user.encryption.ssl.enabled";
   public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";

http://git-wip-us.apache.org/repos/asf/drill/blob/ce80da85/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index c702314..4566e7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -67,6 +67,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.CrossOriginFilter;
 import org.eclipse.jetty.util.resource.Resource;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.joda.time.DateTime;
 
@@ -158,7 +159,7 @@ public class WebServer implements AutoCloseable {
     int retry = 0;
 
     for (; retry < PORT_HUNT_TRIES; retry++) {
-      embeddedJetty = new Server();
+      embeddedJetty = new Server(new QueuedThreadPool(config.getInt(ExecConstants.WEB_SERVER_THREAD_POOL_MAX)));
       embeddedJetty.setHandler(createServletContextHandler(authEnabled));
       embeddedJetty.addConnector(createConnector(port));
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ce80da85/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index cb66ca3..97ac19d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -379,7 +379,9 @@ drill.exec: {
   # refresh time.
   grace_period_ms : 0,
   //port hunting for drillbits. Enabled only for testing purposes.
-  port_hunt : false
+  port_hunt : false,
+  // Max threads of embedded Jetty
+  web_server.thread_pool_max: 200
 
 }
 


[4/7] drill git commit: DRILL-5963: Query state process improvements

Posted by ar...@apache.org.
DRILL-5963: Query state process improvements

1. Added two new query states: PREPARING (when foreman is initialized) and PLANNING (includes logical and / or physical planning).
2. Ability to cancel query during planning and enqueued states was added.
3. Logic for submitting fragments was moved from Foreman to new class FragmentsRunner.
4. Logic for moving query from to new state and incrementing / decrementing query counters was moved into QueryStateProcessor class.
5. Major type in DrillFuncHolderExpr was cached for better performance.

closes #1051


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/03435183
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/03435183
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/03435183

Branch: refs/heads/master
Commit: 034351837885d54196f3dd550ed2dc4e50fc4128
Parents: adee461
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Fri Nov 24 17:59:42 2017 +0200
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:43:29 2018 +0200

----------------------------------------------------------------------
 .../drill/exec/expr/DrillFuncHolderExpr.java    |   9 +-
 .../exec/server/rest/profile/ProfileUtil.java   |  41 +-
 .../server/rest/profile/ProfileWrapper.java     |   6 +-
 .../apache/drill/exec/work/QueryWorkUnit.java   |  45 +
 .../apache/drill/exec/work/foreman/Foreman.java | 887 ++++---------------
 .../exec/work/foreman/FragmentsRunner.java      | 410 +++++++++
 .../drill/exec/work/foreman/QueryManager.java   |  20 +-
 .../exec/work/foreman/QueryStateProcessor.java  | 355 ++++++++
 .../src/main/resources/drill-module.conf        |   2 +-
 .../exec/server/TestDrillbitResilience.java     |   3 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 201 +++--
 .../drill/exec/proto/beans/QueryResult.java     |   6 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +
 13 files changed, 1166 insertions(+), 821 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
index 90368c4..1b0a6eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
@@ -27,18 +27,21 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 
 public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable<LogicalExpression>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
-  private DrillFuncHolder holder;
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
+  private final DrillFuncHolder holder;
+  private final MajorType majorType;
   private DrillSimpleFunc interpreter;
 
   public DrillFuncHolderExpr(String nameUsed, DrillFuncHolder holder, List<LogicalExpression> args, ExpressionPosition pos) {
     super(nameUsed, pos, args);
     this.holder = holder;
+    // since function return type can not be changed, cache it for better performance
+    this.majorType = holder.getReturnType(args);
   }
 
   @Override
   public MajorType getMajorType() {
-    return holder.getReturnType(args);
+    return majorType;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
index cfc7977..a0c0ea7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
@@ -19,30 +19,35 @@ package org.apache.drill.exec.server.rest.profile;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class ProfileUtil {
-  // Display names for QueryState enum in UserBitShared.proto
-  private static final String[] queryStateDisplayNames = {
-    "Starting", // STARTING = 0
-    "Running", // RUNNING = 1
-    "Succeeded", // COMPLETED = 2
-    "Canceled", // CANCELED = 3
-    "Failed", // FAILED = 4
-    "CancellationRequested", // CANCELLATION_REQUESTED = 5
-    "Enqueued" // ENQUEUED = 6
-  };
 
+  private static final Map<QueryState, String> queryStateDisplayMap = new HashMap<>(QueryState.values().length);
+
+  static {
+    queryStateDisplayMap.put(QueryState.PREPARING, "Preparing");
+    queryStateDisplayMap.put(QueryState.PLANNING, "Planning");
+    queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued");
+    queryStateDisplayMap.put(QueryState.STARTING, "Starting");
+    queryStateDisplayMap.put(QueryState.RUNNING, "Running");
+    queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded");
+    queryStateDisplayMap.put(QueryState.FAILED, "Failed");
+    queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested");
+    queryStateDisplayMap.put(QueryState.CANCELED, "Canceled");
+  }
 
   /**
-   * Utility to return display name for query state
-   * @param queryState
+   * Utility method to return display name for query state
+   * @param queryState query state
    * @return display string for query state
    */
-  public final static String getQueryStateDisplayName(QueryState queryState) {
-    int queryStateOrdinal = queryState.getNumber();
-    if (queryStateOrdinal >= queryStateDisplayNames.length) {
-      return queryState.name();
-    } else {
-      return queryStateDisplayNames[queryStateOrdinal];
+  public static String getQueryStateDisplayName(QueryState queryState) {
+    String displayName = queryStateDisplayMap.get(queryState);
+    if (displayName == null) {
+      displayName = queryState.name();
     }
+    return displayName;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 9c2b438..20cc0ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -202,8 +202,10 @@ public class ProfileWrapper {
   }
 
   public String getExecutionDuration() {
-    //Check if State is STARTING or RUNNING
-    if (profile.getState() == QueryState.STARTING ||
+    //Check if State is PREPARING, PLANNING, STARTING, ENQUEUED or RUNNING
+    if (profile.getState() == QueryState.PREPARING ||
+        profile.getState() == QueryState.PLANNING ||
+        profile.getState() == QueryState.STARTING ||
         profile.getState() == QueryState.ENQUEUED ||
         profile.getState() == QueryState.RUNNING) {
       return NOT_AVAILABLE_LABEL;

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index a06d46c..2fa7576 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -23,11 +23,14 @@ import java.util.List;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
+import org.codehaus.jackson.map.ObjectMapper;
 
 public class QueryWorkUnit {
 
@@ -112,4 +115,46 @@ public class QueryWorkUnit {
       fragments.add(defn.applyPlan(reader));
     }
   }
+
+  /**
+   * Converts list of stored fragments into their string representation,
+   * in case of exception returns text indicating that string was malformed.
+   * Is used for debugging purposes.
+   *
+   * @return fragments information
+   */
+  public String stringifyFragments() {
+    StringBuilder stringBuilder = new StringBuilder();
+    final int fragmentCount = fragments.size();
+    int fragmentIndex = 0;
+    for (final PlanFragment planFragment : fragments) {
+      final ExecProtos.FragmentHandle fragmentHandle = planFragment.getHandle();
+      stringBuilder.append("PlanFragment(");
+      stringBuilder.append(++fragmentIndex);
+      stringBuilder.append('/');
+      stringBuilder.append(fragmentCount);
+      stringBuilder.append(") major_fragment_id ");
+      stringBuilder.append(fragmentHandle.getMajorFragmentId());
+      stringBuilder.append(" minor_fragment_id ");
+      stringBuilder.append(fragmentHandle.getMinorFragmentId());
+      stringBuilder.append('\n');
+
+      final CoordinationProtos.DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+      stringBuilder.append("  DrillbitEndpoint address ");
+      stringBuilder.append(endpointAssignment.getAddress());
+      stringBuilder.append('\n');
+
+      String jsonString = "<<malformed JSON>>";
+      stringBuilder.append("  fragment_json: ");
+      final ObjectMapper objectMapper = new ObjectMapper();
+      try {
+        final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+        jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+      } catch (final Exception e) {
+        // we've already set jsonString to a fallback value
+      }
+      stringBuilder.append(jsonString);
+    }
+    return stringBuilder.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8ce8639..391f100 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,18 +17,13 @@
  */
 package org.apache.drill.exec.work.foreman;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.drill.common.CatastrophicFailure;
-import org.apache.drill.common.EventProcessor;
-import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -36,8 +31,6 @@ import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
@@ -48,9 +41,7 @@ import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -63,36 +54,21 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserClientConnection;
-import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
-import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.codahale.metrics.Counter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -100,16 +76,17 @@ import io.netty.util.concurrent.GenericFutureListener;
  *
  * The flow is as follows:
  * <ul>
+ * <li>While Foreman is initialized query is in preparing state.</li>
  * <li>Foreman is submitted as a runnable.</li>
  * <li>Runnable does query planning.</li>
- * <li>state changes from PENDING to RUNNING</li>
- * <li>Runnable sends out starting fragments</li>
+ * <li>Runnable submits query to be enqueued.</li>
+ * <li>The Runnable's run() completes, but the Foreman stays around to listen to state changes.</li>
+ * <li>Once query is enqueued, starting fragments are sent out.</li>
  * <li>Status listener are activated</li>
- * <li>The Runnable's run() completes, but the Foreman stays around</li>
  * <li>Foreman listens for state change messages.</li>
- * <li>state change messages can drive the state to FAILED or CANCELED, in which case
- *   messages are sent to running fragments to terminate</li>
- * <li>when all fragments complete, state change messages drive the state to COMPLETED</li>
+ * <li>State change messages can drive the state to FAILED or CANCELED, in which case
+ *   messages are sent to running fragments to terminate.</li>
+ * <li>When all fragments is completed, state change messages drive the state to COMPLETED.</li>
  * </ul>
  */
 
@@ -118,90 +95,81 @@ public class Foreman implements Runnable {
   private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);
 
-  public enum ProfileOption { SYNC, ASYNC, NONE };
+  public enum ProfileOption { SYNC, ASYNC, NONE }
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
-  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
-
-  private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
-  private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
-  private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
-  private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded");
-  private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed");
-  private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled");
 
   private final QueryId queryId;
   private final String queryIdString;
   private final RunQuery queryRequest;
   private final QueryContext queryContext;
   private final QueryManager queryManager; // handles lower-level details of query execution
-  private final WorkerBee bee; // provides an interface to submit tasks
   private final DrillbitContext drillbitContext;
   private final UserClientConnection initiatingClient; // used to send responses
-  private volatile QueryState state;
   private boolean resume = false;
   private final ProfileOption profileOption;
 
   private final QueryResourceManager queryRM;
 
   private final ResponseSendListener responseListener = new ResponseSendListener();
-  private final StateSwitch stateSwitch = new StateSwitch();
-  private final ForemanResult foremanResult = new ForemanResult();
   private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
   private final ChannelFuture closeFuture;
+  private final FragmentsRunner fragmentsRunner;
+  private final QueryStateProcessor queryStateProcessor;
 
   private String queryText;
 
   /**
    * Constructor. Sets up the Foreman, but does not initiate any execution.
    *
-   * @param bee used to submit additional work
-   * @param drillbitContext
-   * @param connection
+   * @param bee work manager (runs fragments)
+   * @param drillbitContext drillbit context
+   * @param connection connection
    * @param queryId the id for the query
    * @param queryRequest the query to execute
    */
   public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
       final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
-    this.bee = bee;
     this.queryId = queryId;
-    queryIdString = QueryIdHelper.getQueryId(queryId);
+    this.queryIdString = QueryIdHelper.getQueryId(queryId);
     this.queryRequest = queryRequest;
     this.drillbitContext = drillbitContext;
-
-    initiatingClient = connection;
-    closeFuture = initiatingClient.getChannelClosureFuture();
+    this.initiatingClient = connection;
+    this.closeFuture = initiatingClient.getChannelClosureFuture();
     closeFuture.addListener(closeListener);
 
-    queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
-    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
+    this.queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
+    this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
         drillbitContext.getClusterCoordinator(), this);
+    this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
+    this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
+    this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
+    this.profileOption = setProfileOption(queryContext.getOptions());
+  }
 
-    recordNewState(QueryState.ENQUEUED);
-    enqueuedQueries.inc();
-    queryRM = drillbitContext.getResourceManager().newQueryRM(this);
 
-    profileOption = setProfileOption(queryContext.getOptions());
+  /**
+   * @return query id
+   */
+  public QueryId getQueryId() {
+    return queryId;
   }
 
-  private ProfileOption setProfileOption(OptionManager options) {
-    if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
-      return ProfileOption.NONE;
-    }
-    if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
-      return ProfileOption.SYNC;
-    } else {
-      return ProfileOption.ASYNC;
-    }
+  /**
+   * @return current query state
+   */
+  public QueryState getState() {
+    return queryStateProcessor.getState();
   }
 
-  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
-    @Override
-    public void operationComplete(Future<Void> future) throws Exception {
-      cancel();
-    }
+  /**
+   * @return sql query text of the query request
+   */
+  public String getQueryText() {
+    return queryText;
   }
 
+
   /**
    * Get the QueryContext created for the query.
    *
@@ -221,12 +189,21 @@ public class Foreman implements Runnable {
   }
 
   /**
-   * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be
-   * terminated.
+   * Cancel the query (move query in cancellation requested state).
+   * Query execution will be canceled once possible.
    */
   public void cancel() {
-    // Note this can be called from outside of run() on another thread, or after run() completes
-    addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+    queryStateProcessor.cancel();
+  }
+
+  /**
+   * Adds query status in the event queue to process it when foreman is ready.
+   *
+   * @param state new query state
+   * @param exception exception if failure has occurred
+   */
+  public void addToEventQueue(QueryState state, Exception exception) {
+    queryStateProcessor.addToEventQueue(state, exception);
   }
 
   /**
@@ -255,23 +232,21 @@ public class Foreman implements Runnable {
     currentThread.setName(queryIdString + ":foreman");
     try {
       /*
-       Check if the foreman is ONLINE. If not dont accept any new queries.
+       Check if the foreman is ONLINE. If not don't accept any new queries.
       */
       if (!drillbitContext.isForemanOnline()) {
         throw new ForemanException("Query submission failed since Foreman is shutting down.");
       }
     } catch (ForemanException e) {
       logger.debug("Failure while submitting query", e);
-      addToEventQueue(QueryState.FAILED, e);
+      queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
     }
-    // track how long the query takes
-    queryManager.markStartTime();
-    enqueuedQueries.dec();
-    runningQueries.inc();
+
+    queryText = queryRequest.getPlan();
+    queryStateProcessor.moveToState(QueryState.PLANNING, null);
 
     try {
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
-      queryText = queryRequest.getPlan();
 
       // convert a run query request into action
       switch (queryRequest.getType()) {
@@ -299,18 +274,15 @@ public class Foreman implements Runnable {
       }
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
     } catch (final OutOfMemoryException e) {
-      moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
+      queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
     } catch (final ForemanException e) {
-      moveToState(QueryState.FAILED, e);
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {
-      moveToState(QueryState.FAILED,
+      queryStateProcessor.moveToState(QueryState.FAILED,
           new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
     } catch (final OutOfMemoryError e) {
       if ("Direct buffer memory".equals(e.getMessage())) {
-        moveToState(QueryState.FAILED,
-            UserException.resourceError(e)
-                .message("One or more nodes ran out of memory while executing the query.")
-                .build(logger));
+        queryStateProcessor.moveToState(QueryState.FAILED, UserException.resourceError(e).message("One or more nodes ran out of memory while executing the query.").build(logger));
       } else {
         /*
          * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
@@ -321,35 +293,6 @@ public class Foreman implements Runnable {
       }
 
     } finally {
-      /*
-       * Begin accepting external events.
-       *
-       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
-       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
-       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
-       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
-       * event delivery call.
-       *
-       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
-       * make sure that we can't make things any worse as those events are delivered, but allow
-       * any necessary remaining cleanup to proceed.
-       *
-       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
-       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
-       * to accept events.
-       */
-      try {
-        stateSwitch.start();
-      } catch (Exception ex) {
-        moveToState(QueryState.FAILED, ex);
-      }
-
-      // If we received the resume signal before fragments are setup, the first call does not actually resume the
-      // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
-      if(resume) {
-        resume();
-      }
-
       // restore the thread's original name
       currentThread.setName(originalName);
     }
@@ -361,6 +304,31 @@ public class Foreman implements Runnable {
      */
   }
 
+  /**
+   * While one fragments where sanding out, other might have been completed. We don't want to process completed / failed
+   * events until all fragments are sent out. This method triggers events processing when all fragments were sent out.
+   */
+  public void startProcessingEvents() {
+    queryStateProcessor.startProcessingEvents();
+
+    // If we received the resume signal before fragments are setup, the first call does not actually resume the
+    // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+    if (resume) {
+      resume();
+    }
+  }
+
+  private ProfileOption setProfileOption(OptionManager options) {
+    if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
+      return ProfileOption.NONE;
+    }
+    if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
+      return ProfileOption.SYNC;
+    } else {
+      return ProfileOption.ASYNC;
+    }
+  }
+
   private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
     LogicalPlan logicalPlan;
     try {
@@ -436,48 +404,16 @@ public class Foreman implements Runnable {
     queryManager.setTotalCost(plan.totalCost());
     work.applyPlan(drillbitContext.getPlanReader());
     logWorkUnit(work);
-    admit(work);
-    queryManager.setQueueName(queryRM.queueName());
-
-    final List<PlanFragment> planFragments = work.getFragments();
-    final PlanFragment rootPlanFragment = work.getRootFragment();
-    assert queryId == rootPlanFragment.getHandle().getQueryId();
 
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+    fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator());
 
-    logger.debug("Submitting fragments to run.");
-
-    // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootPlanFragment, work.getRootOperator());
-
-    setupNonRootFragments(planFragments);
-
-    moveToState(QueryState.RUNNING, null);
-    logger.debug("Fragments running.");
-  }
-
-  private void admit(QueryWorkUnit work) throws ForemanSetupException {
-    queryManager.markPlanningEndTime();
-    try {
-      queryRM.admit();
-    } catch (QueueTimeoutException e) {
-      throw UserException
-          .resourceError()
-          .message(e.getMessage())
-          .build(logger);
-    } catch (QueryQueueException e) {
-      throw new ForemanSetupException(e.getMessage(), e);
-    } finally {
-      queryManager.markQueueWaitEndTime();
-    }
-    moveToState(QueryState.STARTING, null);
+    startQueryProcessing();
   }
 
   /**
    * This is a helper method to run query based on the list of PlanFragment that were planned
    * at some point of time
-   * @param fragmentsList
+   * @param fragmentsList fragment list
    * @throws ExecutionSetupException
    */
   private void runFragment(List<PlanFragment> fragmentsList) throws ExecutionSetupException {
@@ -502,6 +438,8 @@ public class Foreman implements Runnable {
       }
     }
 
+    assert rootFragment != null;
+
     final FragmentRoot rootOperator;
     try {
       rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
@@ -509,26 +447,73 @@ public class Foreman implements Runnable {
       throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
     }
     queryRM.setCost(rootOperator.getCost());
-    admit(null);
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
-    logger.debug("Submitting fragments to run.");
+    fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
 
-    // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootFragment, rootOperator);
+    startQueryProcessing();
+  }
 
-    setupNonRootFragments(planFragments);
+  /**
+   * Enqueues the query and once enqueued, starts sending out query fragments for further execution.
+   * Moves query to RUNNING state.
+   */
+  private void startQueryProcessing() {
+    enqueue();
+    runFragments();
+    queryStateProcessor.moveToState(QueryState.RUNNING, null);
+  }
 
-    moveToState(QueryState.RUNNING, null);
-    logger.debug("Fragments running.");
+  /**
+   * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
+   * Foreman run will be blocked until query is enqueued.
+   * In case of failures (ex: queue timeout exception) will move query to FAILED state.
+   */
+  private void enqueue() {
+    queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
+
+    try {
+      queryRM.admit();
+      queryStateProcessor.moveToState(QueryState.STARTING, null);
+    } catch (QueueTimeoutException | QueryQueueException e) {
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
+    } finally {
+      String queueName = queryRM.queueName();
+      queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
+    }
+  }
+
+  private void runFragments() {
+    try {
+      fragmentsRunner.submit();
+    } catch (Exception e) {
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
+    } finally {
+       /*
+       * Begin accepting external events.
+       *
+       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
+       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
+       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
+       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
+       * event delivery call.
+       *
+       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
+       * make sure that we can't make things any worse as those events are delivered, but allow
+       * any necessary remaining cleanup to proceed.
+       *
+       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+       * to accept events.
+       */
+      startProcessingEvents();
+    }
   }
 
   /**
    * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
    * object of the <code>preparedStatement</code> and submits as a new query.
    *
-   * @param preparedStatementHandle
+   * @param preparedStatementHandle prepared statement handle
    * @throws ExecutionSetupException
    */
   private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
@@ -559,10 +544,6 @@ public class Foreman implements Runnable {
     }
   }
 
-  Exception getCurrentException() {
-    return foremanResult.getException();
-  }
-
   private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
     final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
     final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
@@ -577,47 +558,26 @@ public class Foreman implements Runnable {
     if (! logger.isTraceEnabled()) {
       return;
     }
-    final StringBuilder sb = new StringBuilder();
-    sb.append("PlanFragments for query ");
-    sb.append(queryId);
-    sb.append('\n');
-
-    final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
-    final int fragmentCount = planFragments.size();
-    int fragmentIndex = 0;
-    for(final PlanFragment planFragment : planFragments) {
-      final FragmentHandle fragmentHandle = planFragment.getHandle();
-      sb.append("PlanFragment(");
-      sb.append(++fragmentIndex);
-      sb.append('/');
-      sb.append(fragmentCount);
-      sb.append(") major_fragment_id ");
-      sb.append(fragmentHandle.getMajorFragmentId());
-      sb.append(" minor_fragment_id ");
-      sb.append(fragmentHandle.getMinorFragmentId());
-      sb.append('\n');
-
-      final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
-      sb.append("  DrillbitEndpoint address ");
-      sb.append(endpointAssignment.getAddress());
-      sb.append('\n');
-
-      String jsonString = "<<malformed JSON>>";
-      sb.append("  fragment_json: ");
-      final ObjectMapper objectMapper = new ObjectMapper();
-      try
-      {
-        final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
-        jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
-      } catch(final Exception e) {
-        // we've already set jsonString to a fallback value
-      }
-      sb.append(jsonString);
+    logger.trace(String.format("PlanFragments for query %s \n%s",
+        queryId, queryWorkUnit.stringifyFragments()));
+  }
+
+  private void runSQL(final String sql) throws ExecutionSetupException {
+    final Pointer<String> textPlan = new Pointer<>();
+    final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
+    queryManager.setPlanText(textPlan.value);
+    runPhysicalPlan(plan);
+  }
 
-      logger.trace(sb.toString());
+  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
     }
+    return new BasicOptimizer(queryContext, initiatingClient).optimize(
+        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
   }
 
+
   /**
    * Manages the end-state processing for Foreman.
    *
@@ -631,7 +591,7 @@ public class Foreman implements Runnable {
    * The idea here is to make close()ing the ForemanResult do the final cleanup and
    * sending. Closing the result must be the last thing that is done by Foreman.
    */
-  private class ForemanResult implements AutoCloseable {
+  public class ForemanResult implements AutoCloseable {
     private QueryState resultState = null;
     private volatile Exception resultException = null;
     private boolean isClosed = false;
@@ -688,7 +648,7 @@ public class Foreman implements Runnable {
      * @param exception the exception to add
      */
     private void addException(final Exception exception) {
-      Preconditions.checkNotNull(exception);
+      assert exception != null;
 
       if (resultException == null) {
         resultException = exception;
@@ -741,7 +701,7 @@ public class Foreman implements Runnable {
             queryText,
             new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
             new Date(System.currentTimeMillis()),
-            state,
+            queryStateProcessor.getState(),
             queryContext.getSession().getCredentials().getUserName(),
             initiatingClient.getRemoteAddress());
         queryLogger.info(MAPPER.writeValueAsString(q));
@@ -756,9 +716,6 @@ public class Foreman implements Runnable {
       Preconditions.checkState(!isClosed);
       Preconditions.checkState(resultState != null);
 
-      // to track how long the query takes
-      queryManager.markEndTime();
-
       logger.debug(queryIdString + ": cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
 
@@ -780,11 +737,11 @@ public class Foreman implements Runnable {
        *
        * We only need to do this if the resultState differs from the last recorded state
        */
-      if (resultState != state) {
+      if (resultState != queryStateProcessor.getState()) {
         suppressingClose(new AutoCloseable() {
           @Override
           public void close() throws Exception {
-            recordNewState(resultState);
+            queryStateProcessor.recordNewState(resultState);
           }
         });
       }
@@ -842,7 +799,7 @@ public class Foreman implements Runnable {
       }
 
       // Remove the Foreman from the running query list.
-      bee.retireForeman(Foreman.this);
+      fragmentsRunner.getBee().retireForeman(Foreman.this);
 
       try {
         queryManager.close();
@@ -850,21 +807,9 @@ public class Foreman implements Runnable {
         logger.warn("unable to close query manager", e);
       }
 
-      // Incrementing QueryState counters
-      switch (state) {
-        case FAILED:
-          failedQueries.inc();
-          break;
-        case CANCELED:
-          canceledQueries.inc();
-          break;
-        case COMPLETED:
-          succeededQueries.inc();
-          break;
-      }
 
-      runningQueries.dec();
-      completedQueries.inc();
+      queryStateProcessor.close();
+
       try {
         queryRM.exit();
       } finally {
@@ -873,472 +818,10 @@ public class Foreman implements Runnable {
     }
   }
 
-  private static class StateEvent {
-    final QueryState newState;
-    final Exception exception;
-
-    StateEvent(final QueryState newState, final Exception exception) {
-      this.newState = newState;
-      this.exception = exception;
-    }
-  }
-
-  private void moveToState(final QueryState newState, final Exception exception) {
-    logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
-      exception);
-    switch (state) {
-    case ENQUEUED:
-      switch (newState) {
-      case FAILED:
-        Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
-        recordNewState(newState);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      case STARTING:
-        recordNewState(newState);
-        return;
-      }
-      break;
-    case STARTING:
-      if (newState == QueryState.RUNNING) {
-        recordNewState(QueryState.RUNNING);
-        return;
-      }
-
-      //$FALL-THROUGH$
-
-    case RUNNING: {
-      /*
-       * For cases that cancel executing fragments, we have to record the new
-       * state first, because the cancellation of the local root fragment will
-       * cause this to be called recursively.
-       */
-      switch (newState) {
-      case CANCELLATION_REQUESTED: {
-        assert exception == null;
-        recordNewState(QueryState.CANCELLATION_REQUESTED);
-        queryManager.cancelExecutingFragments(drillbitContext);
-        foremanResult.setCompleted(QueryState.CANCELED);
-        /*
-         * We don't close the foremanResult until we've gotten
-         * acknowledgments, which happens below in the case for current state
-         * == CANCELLATION_REQUESTED.
-         */
-        return;
-      }
-
-      case COMPLETED: {
-        assert exception == null;
-        recordNewState(QueryState.COMPLETED);
-        foremanResult.setCompleted(QueryState.COMPLETED);
-        foremanResult.close();
-        return;
-      }
-
-      case FAILED: {
-        assert exception != null;
-        recordNewState(QueryState.FAILED);
-        queryManager.cancelExecutingFragments(drillbitContext);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      }
-
-      }
-      break;
-    }
-
-    case CANCELLATION_REQUESTED:
-      if ((newState == QueryState.CANCELED)
-        || (newState == QueryState.COMPLETED)
-        || (newState == QueryState.FAILED)) {
-
-        if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
-          if (newState == QueryState.FAILED) {
-            assert exception != null;
-            recordNewState(QueryState.FAILED);
-            foremanResult.setForceFailure(exception);
-          }
-        }
-        /*
-         * These amount to a completion of the cancellation requests' cleanup;
-         * now we can clean up and send the result.
-         */
-        foremanResult.close();
-      }
-      return;
-
-    case CANCELED:
-    case COMPLETED:
-    case FAILED:
-      logger
-        .warn(
-          "Dropping request to move to {} state as query is already at {} state (which is terminal).",
-          newState, state);
-      return;
-    }
-
-    throw new IllegalStateException(String.format(
-      "Failure trying to change states: %s --> %s", state.name(),
-      newState.name()));
-  }
-
-  private class StateSwitch extends EventProcessor<StateEvent> {
-    public void addEvent(final QueryState newState, final Exception exception) {
-      sendEvent(new StateEvent(newState, exception));
-    }
-
-    @Override
-    protected void processEvent(final StateEvent event) {
-      moveToState(event.newState, event.exception);
-    }
-  }
-
-  /**
-   * Tells the foreman to move to a new state.<br>
-   * This will be added to the end of the event queue and will be processed once the foreman is ready
-   * to accept external events.
-   *
-   * @param newState the state to move to
-   * @param exception if not null, the exception that drove this state transition (usually a failure)
-   */
-  public void addToEventQueue(final QueryState newState, final Exception exception) {
-    stateSwitch.addEvent(newState, exception);
-  }
-
-  private void recordNewState(final QueryState newState) {
-    state = newState;
-    queryManager.updateEphemeralState(newState);
-  }
-
-  private void runSQL(final String sql) throws ExecutionSetupException {
-    final Pointer<String> textPlan = new Pointer<>();
-    final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
-    queryManager.setPlanText(textPlan.value);
-    runPhysicalPlan(plan);
-  }
-
-  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
-    }
-    return new BasicOptimizer(queryContext, initiatingClient).optimize(
-        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  /**
-   * Set up the root fragment (which will run locally), and submit it for execution.
-   *
-   * @param rootFragment
-   * @param rootOperator
-   * @throws ExecutionSetupException
-   */
-  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
-      throws ExecutionSetupException {
-    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
-        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
-    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
-    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
-    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
-
-    queryManager.addFragmentStatusTracker(rootFragment, true);
-
-    // FragmentManager is setting buffer for FragmentContext
-    if (rootContext.isBuffersDone()) {
-      // if we don't have to wait for any incoming data, start the fragment runner.
-      bee.addFragmentRunner(rootRunner);
-    } else {
-      // if we do, record the fragment manager in the workBus.
-      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
-    }
-  }
-
-  /**
-   * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
-   * and the local Drillbit Endpoint.
-   * @param planFragment
-   * @param localEndPoint
-   * @param localFragmentList
-   * @param remoteFragmentMap
-   */
-  private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
-                                        final List<PlanFragment> localFragmentList,
-                                        final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
-
-    if (assignedDrillbit.equals(localEndPoint)) {
-      localFragmentList.add(planFragment);
-    } else {
-      remoteFragmentMap.put(assignedDrillbit, planFragment);
-    }
-  }
-
-  /**
-   * Send remote intermediate fragment to the assigned Drillbit node. Throw exception in case of failure to send the
-   * fragment.
-   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
-   */
-  private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-
-    final int numIntFragments = remoteFragmentMap.keySet().size();
-    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
-    final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
-
-    // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
-    }
-
-    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
-    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
-      long numberRemaining = endpointLatch.getCount();
-      throw UserException.connectionError()
-          .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
-              "Sent %d and only heard response back from %d nodes.",
-              timeout, numIntFragments, numIntFragments - numberRemaining).build(logger);
-    }
-
-    // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
-        fragmentSubmitFailures.submissionExceptions;
-
-    if (submissionExceptions.size() > 0) {
-      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-
-      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
-        DrillbitEndpoint endpoint = e.drillbitEndpoint;
-        if (endpoints.add(endpoint)) {
-          if (first) {
-            first = false;
-          } else {
-            sb.append(", ");
-          }
-          sb.append(endpoint.getAddress());
-        }
-      }
-      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
-          .message("Error setting up remote intermediate fragment execution")
-          .addContext("Nodes with failures", sb.toString()).build(logger);
-    }
-  }
-
-
-  /**
-   * Start the locally assigned leaf or intermediate fragment
-   * @param fragment
-   * @throws ForemanException
-   */
-  private void startLocalFragment(final PlanFragment fragment) throws ForemanException {
-
-    logger.debug("Received local fragment start instruction", fragment);
-
-    try {
-      final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment,
-          drillbitContext.getFunctionImplementationRegistry());
-      final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
-      final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
-
-      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
-      if (fragment.getLeafFragment()) {
-        bee.addFragmentRunner(fragmentExecutor);
-      } else {
-        // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
-        drillbitContext.getWorkBus().addFragmentManager(manager);
-      }
-
-    } catch (final ExecutionSetupException ex) {
-      throw new ForemanException("Failed to create fragment context", ex);
-    } catch (final Exception ex) {
-      throw new ForemanException("Failed while trying to start local fragment", ex);
-    }
-  }
-
-  /**
-   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
-   * Messages are sent immediately, so they may start returning data even before we complete this.
-   *
-   * @param fragments the fragments
-   * @throws ForemanException
-   */
-  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
-    if (fragments.isEmpty()) {
-      // nothing to do here
-      return;
-    }
-    /*
-     * We will send a single message to each endpoint, regardless of how many fragments will be
-     * executed there. We need to start up the intermediate fragments first so that they will be
-     * ready once the leaf fragments start producing data. To satisfy both of these, we will
-     * make a pass through the fragments and put them into the remote maps according to their
-     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
-     * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
-     *
-     * This will help to schedule local
-     */
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
-    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
-    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
-
-    final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
-    // record all fragments for status purposes.
-    for (final PlanFragment planFragment : fragments) {
-
-      if (logger.isTraceEnabled()) {
-        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
-            planFragment.getFragmentJson());
-      }
-
-      queryManager.addFragmentStatusTracker(planFragment, false);
-
-      if (planFragment.getLeafFragment()) {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
-      } else {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
-      }
-    }
-
-    /*
-     * We need to wait for the intermediates to be sent so that they'll be set up by the time
-     * the leaves start producing data. We'll use this latch to wait for the responses.
-     *
-     * However, in order not to hang the process if any of the RPC requests fails, we always
-     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
-     * know if any submissions did fail.
-     */
-    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
-
-    // Setup local intermediate fragments
-    for (final PlanFragment fragment : localIntFragmentList) {
-      startLocalFragment(fragment);
-    }
-
-    injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class);
-    /*
-     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
-     * the regular sendListener event delivery.
-     */
-    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
-    }
-
-    // Setup local leaf fragments
-    for (final PlanFragment fragment : localLeafFragmentList) {
-      startLocalFragment(fragment);
-    }
-  }
-
-  /**
-   * Send all the remote fragments belonging to a single target drillbit in one request.
-   *
-   * @param assignment the drillbit assigned to these fragments
-   * @param fragments the set of fragments
-   * @param latch the countdown latch used to track the requests to all endpoints
-   * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
-   */
-  private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
-      final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
-    @SuppressWarnings("resource")
-    final Controller controller = drillbitContext.getController();
-    final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(final PlanFragment planFragment : fragments) {
-      fb.addFragment(planFragment);
-    }
-    final InitializeFragments initFrags = fb.build();
-
-    logger.debug("Sending remote fragments to \nNode:\n{} \n\nData:\n{}", assignment, initFrags);
-    final FragmentSubmitListener listener =
-        new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
-    controller.getTunnel(assignment).sendFragments(listener, initFrags);
-  }
-
-  public QueryState getState() {
-    return state;
-  }
-
-  /**
-   * @return sql query text of the query request
-   */
-  public String getQueryText() {
-    return queryText;
-  }
-
-  /**
-   * Used by {@link FragmentSubmitListener} to track the number of submission failures.
-   */
-  private static class FragmentSubmitFailures {
-    static class SubmissionException {
-      final DrillbitEndpoint drillbitEndpoint;
-      final RpcException rpcException;
-
-      SubmissionException(final DrillbitEndpoint drillbitEndpoint,
-          final RpcException rpcException) {
-        this.drillbitEndpoint = drillbitEndpoint;
-        this.rpcException = rpcException;
-      }
-    }
-
-    final List<SubmissionException> submissionExceptions = new LinkedList<>();
-
-    void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
-      submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
-    }
-  }
-
-  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> {
-    private final CountDownLatch latch;
-    private final FragmentSubmitFailures fragmentSubmitFailures;
-
-    /**
-     * Constructor.
-     *
-     * @param endpoint the endpoint for the submission
-     * @param value the initialize fragments message
-     * @param latch the latch to count down when the status is known; may be null
-     * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
-     */
-    public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
-        final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
-      super(endpoint, value);
-      Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
-      this.latch = latch;
-      this.fragmentSubmitFailures = fragmentSubmitFailures;
-    }
-
-    @Override
-    public void success(final Ack ack, final ByteBuf byteBuf) {
-      if (latch != null) {
-        latch.countDown();
-      }
-    }
-
-    @Override
-    public void failed(final RpcException ex) {
-      if (latch != null) { // this block only applies to intermediate fragments
-        fragmentSubmitFailures.addFailure(endpoint, ex);
-        latch.countDown();
-      } else { // this block only applies to leaf fragments
-        // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
-        logger.debug("Failure while sending fragment.  Stopping query.", ex);
-        addToEventQueue(QueryState.FAILED, ex);
-      }
-    }
-
+  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
     @Override
-    public void interrupted(final InterruptedException e) {
-      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
-      // Consider the interrupt as failure.
-      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
-      logger.error(errMsg, e);
-      failed(new RpcException(errMsg, e));
+    public void operationComplete(Future<Void> future) throws Exception {
+      cancel();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
new file mode 100644
index 0000000..ce04848
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.RootFragmentManager;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Is responsible for submitting query fragments for running (locally and remotely).
+ */
+public class FragmentsRunner {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class);
+  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class);
+
+  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+
+  private final WorkerBee bee;
+  private final UserClientConnection initiatingClient;
+  private final DrillbitContext drillbitContext;
+  private final Foreman foreman;
+
+  private List<PlanFragment> planFragments;
+  private PlanFragment rootPlanFragment;
+  private FragmentRoot rootOperator;
+
+  public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) {
+    this.bee = bee;
+    this.initiatingClient = initiatingClient;
+    this.drillbitContext = drillbitContext;
+    this.foreman = foreman;
+  }
+
+  public WorkerBee getBee() {
+    return bee;
+  }
+
+  public void setFragmentsInfo(List<PlanFragment> planFragments,
+                                  PlanFragment rootPlanFragment,
+                                  FragmentRoot rootOperator) {
+    this.planFragments = planFragments;
+    this.rootPlanFragment = rootPlanFragment;
+    this.rootOperator = rootOperator;
+  }
+
+  /**
+   * Submits root and non-root fragments fragments for running.
+   * In case of success move query to the running state.
+   */
+  public void submit() throws ExecutionSetupException {
+    assert planFragments != null;
+    assert rootPlanFragment != null;
+    assert rootOperator != null;
+
+    QueryId queryId = foreman.getQueryId();
+    assert queryId == rootPlanFragment.getHandle().getQueryId();
+
+    QueryManager queryManager = foreman.getQueryManager();
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+
+    logger.debug("Submitting fragments to run.");
+    // set up the root fragment first so we'll have incoming buffers available.
+    setupRootFragment(rootPlanFragment, rootOperator);
+    setupNonRootFragments(planFragments);
+    logger.debug("Fragments running.");
+  }
+
+  /**
+   * Set up the root fragment (which will run locally), and submit it for execution.
+   *
+   * @param rootFragment root fragment
+   * @param rootOperator root operator
+   * @throws ExecutionSetupException
+   */
+  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator) throws ExecutionSetupException {
+    QueryManager queryManager = foreman.getQueryManager();
+    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, foreman.getQueryContext(),
+        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
+    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
+    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
+    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
+
+    queryManager.addFragmentStatusTracker(rootFragment, true);
+
+    // FragmentManager is setting buffer for FragmentContext
+    if (rootContext.isBuffersDone()) {
+      // if we don't have to wait for any incoming data, start the fragment runner.
+      bee.addFragmentRunner(rootRunner);
+    } else {
+      // if we do, record the fragment manager in the workBus.
+      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
+    }
+  }
+
+
+  /**
+   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+   * Messages are sent immediately, so they may start returning data even before we complete this.
+   *
+   * @param fragments the fragments
+   */
+  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ExecutionSetupException {
+    if (fragments.isEmpty()) {
+      // nothing to do here
+      return;
+    }
+    /*
+     * We will send a single message to each endpoint, regardless of how many fragments will be
+     * executed there. We need to start up the intermediate fragments first so that they will be
+     * ready once the leaf fragments start producing data. To satisfy both of these, we will
+     * make a pass through the fragments and put them into the remote maps according to their
+     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
+     * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
+     *
+     * This will help to schedule local
+     */
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+
+    final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
+    // record all fragments for status purposes.
+    for (final PlanFragment planFragment : fragments) {
+
+      if (logger.isTraceEnabled()) {
+        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
+            planFragment.getFragmentJson());
+      }
+
+      foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
+
+      if (planFragment.getLeafFragment()) {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
+      } else {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
+      }
+    }
+
+    /*
+     * We need to wait for the intermediates to be sent so that they'll be set up by the time
+     * the leaves start producing data. We'll use this latch to wait for the responses.
+     *
+     * However, in order not to hang the process if any of the RPC requests fails, we always
+     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+     * know if any submissions did fail.
+     */
+    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
+
+    // Setup local intermediate fragments
+    for (final PlanFragment fragment : localIntFragmentList) {
+      startLocalFragment(fragment);
+    }
+
+    injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);
+    /*
+     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
+     * the regular sendListener event delivery.
+     */
+    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
+    }
+
+    // Setup local leaf fragments
+    for (final PlanFragment fragment : localLeafFragmentList) {
+      startLocalFragment(fragment);
+    }
+  }
+
+  /**
+   * Send all the remote fragments belonging to a single target drillbit in one request.
+   *
+   * @param assignment the drillbit assigned to these fragments
+   * @param fragments the set of fragments
+   * @param latch the countdown latch used to track the requests to all endpoints
+   * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
+   */
+  private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
+                                   final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+    @SuppressWarnings("resource")
+    final Controller controller = drillbitContext.getController();
+    final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+    for(final PlanFragment planFragment : fragments) {
+      fb.addFragment(planFragment);
+    }
+    final InitializeFragments initFrags = fb.build();
+
+    logger.debug("Sending remote fragments to node: {}\nData: {}", assignment, initFrags);
+    final FragmentSubmitListener listener =
+        new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
+    controller.getTunnel(assignment).sendFragments(listener, initFrags);
+  }
+
+  /**
+   * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
+   * and the local Drillbit Endpoint.
+   *
+   * @param planFragment plan fragment
+   * @param localEndPoint local endpoint
+   * @param localFragmentList local fragment list
+   * @param remoteFragmentMap remote fragment map
+   */
+  private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
+                                        final List<PlanFragment> localFragmentList,
+                                        final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
+
+    if (assignedDrillbit.equals(localEndPoint)) {
+      localFragmentList.add(planFragment);
+    } else {
+      remoteFragmentMap.put(assignedDrillbit, planFragment);
+    }
+  }
+
+  /**
+   * Send remote intermediate fragment to the assigned Drillbit node.
+   * Throw exception in case of failure to send the fragment.
+   *
+   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
+   */
+  private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+
+    final int numIntFragments = remoteFragmentMap.keySet().size();
+    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
+    final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
+
+    // send remote intermediate fragments
+    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
+    }
+
+    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
+      long numberRemaining = endpointLatch.getCount();
+      throw UserException.connectionError()
+          .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+                  "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - numberRemaining).build(logger);
+    }
+
+    // if any of the intermediate fragment submissions failed, fail the query
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+        fragmentSubmitFailures.submissionExceptions;
+
+    if (submissionExceptions.size() > 0) {
+      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+
+      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
+        DrillbitEndpoint endpoint = e.drillbitEndpoint;
+        if (endpoints.add(endpoint)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(endpoint.getAddress());
+        }
+      }
+      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
+          .message("Error setting up remote intermediate fragment execution")
+          .addContext("Nodes with failures", sb.toString()).build(logger);
+    }
+  }
+
+
+  /**
+   * Start the locally assigned leaf or intermediate fragment
+   *
+   * @param fragment fragment
+   */
+  private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException {
+    logger.debug("Received local fragment start instruction", fragment);
+
+    final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry());
+    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
+    final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
+
+    // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+    if (fragment.getLeafFragment()) {
+      bee.addFragmentRunner(fragmentExecutor);
+    } else {
+      // isIntermediate, store for incoming data.
+      final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
+      drillbitContext.getWorkBus().addFragmentManager(manager);
+    }
+  }
+
+  /**
+   * Used by {@link FragmentSubmitListener} to track the number of submission failures.
+   */
+  private static class FragmentSubmitFailures {
+    static class SubmissionException {
+      final DrillbitEndpoint drillbitEndpoint;
+      final RpcException rpcException;
+
+      SubmissionException(final DrillbitEndpoint drillbitEndpoint,
+                          final RpcException rpcException) {
+        this.drillbitEndpoint = drillbitEndpoint;
+        this.rpcException = rpcException;
+      }
+    }
+
+    final List<SubmissionException> submissionExceptions = new LinkedList<>();
+
+    void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
+      submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
+    }
+  }
+
+  private class FragmentSubmitListener extends EndpointListener<GeneralRPCProtos.Ack, InitializeFragments> {
+    private final CountDownLatch latch;
+    private final FragmentSubmitFailures fragmentSubmitFailures;
+
+    /**
+     * Constructor.
+     *
+     * @param endpoint the endpoint for the submission
+     * @param value the initialize fragments message
+     * @param latch the latch to count down when the status is known; may be null
+     * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
+     */
+    public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
+                                  final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+      super(endpoint, value);
+      Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
+      this.latch = latch;
+      this.fragmentSubmitFailures = fragmentSubmitFailures;
+    }
+
+    @Override
+    public void success(final GeneralRPCProtos.Ack ack, final ByteBuf byteBuf) {
+      if (latch != null) {
+        latch.countDown();
+      }
+    }
+
+    @Override
+    public void failed(final RpcException ex) {
+      if (latch != null) { // this block only applies to intermediate fragments
+        fragmentSubmitFailures.addFailure(endpoint, ex);
+        latch.countDown();
+      } else { // this block only applies to leaf fragments
+        // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
+        logger.debug("Failure while sending fragment.  Stopping query.", ex);
+        foreman.addToEventQueue(QueryState.FAILED, ex);
+      }
+    }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
+      // Consider the interrupt as failure.
+      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+      logger.error(errMsg, e);
+      failed(new RpcException(errMsg, e));
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 216a80d..addd8fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -280,14 +280,15 @@ public class QueryManager implements AutoCloseable {
   }
 
   void updateEphemeralState(final QueryState queryState) {
-      // If query is already in zk transient store, ignore the transient state update option.
-      // Else, they will not be removed from transient store upon completion.
-      if (!inTransientStore &&
-          !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
-        return;
-      }
+    // If query is already in zk transient store, ignore the transient state update option.
+    // Else, they will not be removed from transient store upon completion.
+    if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
+      return;
+    }
 
-      switch (queryState) {
+    switch (queryState) {
+      case PREPARING:
+      case PLANNING:
       case ENQUEUED:
       case STARTING:
       case RUNNING:
@@ -295,15 +296,14 @@ public class QueryManager implements AutoCloseable {
         runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
         inTransientStore = true;
         break;
-
       case COMPLETED:
       case CANCELED:
       case FAILED:
         try {
           runningProfileStore.remove(stringQueryId);
           inTransientStore = false;
-        } catch(final Exception e) {
-          logger.warn("Failure while trying to delete the estore profile for this query.", e);
+        } catch (final Exception e) {
+          logger.warn("Failure while trying to delete the stored profile for the query [{}]", stringQueryId, e);
         }
         break;