You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2016/10/17 05:45:15 UTC
[2/4] lens git commit: LENS-743: Query retry framework for retrying
upon transient failures
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java
new file mode 100644
index 0000000..6169744
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.query.events;
+
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
+
+/**
+ * The Class StatusChange.
+ */
+public abstract class StatusChange extends QueryEvent<QueryStatus.Status> {
+
+ /**
+ * Instantiates a new status change.
+ *
+ * @param eventTime the event time
+ * @param prev the prev
+ * @param current the current
+ * @param handle the handle
+ */
+ public StatusChange(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle) {
+ super(eventTime, prev, current, handle);
+ }
+
+ /**
+ * Check current state.
+ *
+ * @param status the status
+ */
+ protected void checkCurrentState(QueryStatus.Status status) {
+ if (currentValue != status) {
+ throw new IllegalStateException("Invalid query state: " + currentValue + " query:" + queryHandle.toString());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java
new file mode 100644
index 0000000..5ea5710
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.api.retry;
+
+import java.io.Serializable;
+
+/**
+ * A backoff retry handler.
+ *
+ * This allows a backoff on any call, so provides methods whether we can try the operation now,
+ * whats next time when operation can be performed and whether operation has exhausted all retries.
+ * Callers of this can do the following :
+ *
+ * if (handler.canTryOpNow(FailureContext)) {
+ * try {
+ * tryCallerOperation();
+ * FailureContext.clear();
+ * } catch (any Transient Exception) {
+ * FailureContext.updateFailure();
+ * if (!handler.hasExhaustedRetries(FailureContext)) {
+ * // will be tried later again
+ * }
+ * throw exception;
+ * }
+ * }
+ *
+ * Note that this is only one of the possible use cases, other complex use cases are in retry framework.
+ */
+public interface BackOffRetryHandler<FC extends FailureContext> extends Serializable {
+
+ /**
+ * To know whether operation can be done now.
+ *
+ * @param failContext FailureContext holding failures till now.
+ *
+ * @return true if operation can be done now, false otherwise.
+ */
+ boolean canTryOpNow(FC failContext);
+
+ /**
+ * Get the time when the operation can be done next.
+ *
+ * @param failContext FC holding failures till now.
+ *
+ * @return Next operation time in millis since epoch
+ */
+ long getOperationNextTime(FC failContext);
+
+ /**
+ * Has the operation exhausted all its retries
+ *
+ * @param failContext FC holding failures till now.
+ *
+ * @return true if all retries have exhausted, false otherwise.
+ */
+ boolean hasExhaustedRetries(FC failContext);
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java
new file mode 100644
index 0000000..46526ad
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+import java.util.List;
+
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+import lombok.Data;
+
+@Data
+public class ChainedRetryPolicyDecider<FC extends FailureContext> implements RetryPolicyDecider<FC> {
+ private final Iterable<RetryPolicyDecider<FC>> policyDeciders;
+
+ @Override
+ public BackOffRetryHandler<FC> decidePolicy(String errorMessage) {
+ for (RetryPolicyDecider<FC> policyDecider : policyDeciders) {
+ BackOffRetryHandler<FC> policy = policyDecider.decidePolicy(errorMessage);
+ if (policy != null) {
+ return policy;
+ }
+ }
+ return new NoRetryHandler<>();
+ }
+ public static <FC extends FailureContext> ChainedRetryPolicyDecider<FC> from(Configuration conf, String key)
+ throws LensException {
+ String[] classNames = conf.getStrings(key);
+ List<RetryPolicyDecider<FC>> retryPolicyDeciders = Lists.newArrayList();
+ if (classNames != null) {
+ for (String className: classNames) {
+ Class<? extends RetryPolicyDecider<FC>> clazz;
+ try {
+ clazz = (Class<? extends RetryPolicyDecider<FC>>) conf.getClassByName(className)
+ .asSubclass(RetryPolicyDecider.class);
+ } catch (ClassNotFoundException e) {
+ throw new LensException("Couldn't load class " + className, e);
+ }
+ RetryPolicyDecider<FC> instance;
+ try {
+ instance = clazz.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new LensException("Couldn't create instance of class " + clazz.getName(), e);
+ }
+ if (instance instanceof Configurable) {
+ ((Configurable) instance).setConf(conf);
+ }
+ retryPolicyDeciders.add(instance);
+ }
+ }
+ return new ChainedRetryPolicyDecider<>(retryPolicyDeciders);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java
new file mode 100644
index 0000000..0f5ba26
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+public class DefaultRetryPolicyDecider<FC extends FailureContext> implements RetryPolicyDecider<FC> {
+
+ @Override
+ public BackOffRetryHandler<FC> decidePolicy(String errorMessage) {
+ return new NoRetryHandler<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java
new file mode 100644
index 0000000..24e3d5a
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+/**
+ * Any data structure that has fail and retry capability should deal with an implementation of this interface.
+ * @see org.apache.lens.server.api.query.StatusUpdateFailureContext
+ * @see org.apache.lens.server.api.query.QueryContext
+ */
+public interface FailureContext {
+ /**
+ *
+ * @return Last time of failure
+ */
+ long getLastFailedTime();
+
+ /**
+ *
+ * @return number of times failure has occured so far
+ */
+ int getFailCount();
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java
new file mode 100644
index 0000000..01da25d
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.api.retry;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+
+/**
+ * A exponential backoff retry handler.
+ *
+ * It allows the the failures to be retried at a next update time, which can increase exponentially.
+ *
+ */
+public class FibonacciExponentialBackOffRetryHandler<FC extends FailureContext> implements BackOffRetryHandler<FC> {
+ final int[] fibonacci;
+ final long maxDelay;
+ final long waitMillis;
+
+ public FibonacciExponentialBackOffRetryHandler(int numRetries, long maxDelay, long waitMillis) {
+ checkArgument(numRetries > 2);
+ fibonacci = new int[numRetries];
+ fibonacci[0] = 1;
+ fibonacci[1] = 1;
+ for(int i = 2; i < numRetries; ++i) {
+ fibonacci[i] = fibonacci[i-1] + fibonacci[i-2];
+ }
+ this.maxDelay = maxDelay;
+ this.waitMillis = waitMillis;
+ }
+
+ public boolean canTryOpNow(FC failContext) {
+ synchronized (failContext) {
+ if (failContext.getFailCount() != 0) {
+ long now = System.currentTimeMillis();
+ if (now < getOperationNextTime(failContext)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ public long getOperationNextTime(FC failContext) {
+ synchronized (failContext) {
+ if (failContext.getFailCount() >= fibonacci.length) {
+ return failContext.getLastFailedTime() + maxDelay;
+ }
+ long delay = Math.min(maxDelay, fibonacci[failContext.getFailCount()] * waitMillis);
+ return failContext.getLastFailedTime() + delay;
+ }
+ }
+
+ public boolean hasExhaustedRetries(FC failContext) {
+ synchronized (failContext) {
+ if (failContext.getFailCount() >= fibonacci.length) {
+ return true;
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java
new file mode 100644
index 0000000..c1c0126
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class ImmediateRetryHandler<FC extends FailureContext> implements BackOffRetryHandler<FC> {
+ private final int retries;
+ private int retriesDone = 0;
+ // default 1 retry
+ public ImmediateRetryHandler() {
+ this(1);
+ }
+
+ @Override
+ public boolean canTryOpNow(FC failContext) {
+ return true;
+ }
+
+ @Override
+ public long getOperationNextTime(FC failContext) {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean hasExhaustedRetries(FC failContext) {
+ return ++retriesDone > retries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java
new file mode 100644
index 0000000..df68a48
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+public class NoRetryHandler<FC extends FailureContext> extends ImmediateRetryHandler<FC> {
+ NoRetryHandler() {
+ super(0);
+ }
+
+ @Override
+ public boolean canTryOpNow(FC failContext) {
+ return false;
+ }
+
+ @Override
+ public long getOperationNextTime(FC failContext) {
+ return Long.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java
new file mode 100644
index 0000000..35fdaca
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.api.retry;
+
+/**
+ * Factory which creates operation retry handler
+ */
+public class OperationRetryHandlerFactory {
+ private OperationRetryHandlerFactory() {
+ }
+
+ /**
+ * Create exponential backoff handler
+ *
+ * @param numRetries Number of exponential backoff retries
+ * @param maxDelay Maximum delay an operation can wait for next
+ * @param waitMillis Number of millis that would grow exponentially incase of failures
+ *
+ * @return BackOffRetryHandler
+ */
+ public static <FC extends FailureContext> BackOffRetryHandler<FC> createExponentialBackOffHandler(int numRetries,
+ long maxDelay, long waitMillis) {
+ return new FibonacciExponentialBackOffRetryHandler<>(numRetries, maxDelay, waitMillis);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java
new file mode 100644
index 0000000..88448b0
--- /dev/null
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+
+public interface RetryPolicyDecider<FC extends FailureContext> {
+
+ BackOffRetryHandler<FC> decidePolicy(String errorMessage);
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java b/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java
index 8261d8a..9d732c1 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.lens.server.api.common.ConfigBasedObjectCreationFactory;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.ImmutableSet;
@@ -69,26 +70,23 @@ public final class LensUtil {
public static boolean isSocketException(@NonNull Throwable e) {
Throwable cause = getCause(e);
- if (cause instanceof SocketException || cause instanceof SocketTimeoutException) {
- return true;
- }
- return false;
+ return cause instanceof SocketException || cause instanceof SocketTimeoutException;
}
- public static <T> ImmutableSet<T> getImplementations(final String factoriesKey, final Configuration conf) {
-
+ public static <T> Set<T> getImplementationsMutable(final String factoriesKey, final Configuration conf) {
Set<T> implSet = Sets.newLinkedHashSet();
final String[] factoryNames = conf.getStrings(factoriesKey);
-
- if (factoryNames == null) {
- return ImmutableSet.copyOf(implSet);
- }
-
- for (String factoryName : factoryNames) {
- if (StringUtils.isNotBlank(factoryName)) {
- final T implementation = getImplementation(factoryName.trim(), conf);
- implSet.add(implementation);
+ if (factoryNames != null) {
+ for (String factoryName : factoryNames) {
+ if (StringUtils.isNotBlank(factoryName)) {
+ final T implementation = getImplementation(factoryName.trim(), conf);
+ implSet.add(implementation);
+ }
}
}
+ return implSet;
+ }
+ public static <T> ImmutableSet<T> getImplementations(final String factoriesKey, final Configuration conf) {
+ Set<T> implSet = getImplementationsMutable(factoriesKey, conf);
return ImmutableSet.copyOf(implSet);
}
@@ -97,7 +95,11 @@ public final class LensUtil {
try {
ConfigBasedObjectCreationFactory<T> factory
= (ConfigBasedObjectCreationFactory<T>) Class.forName(factoryName).newInstance();
- return factory.create(conf);
+ T ret = factory.create(conf);
+ if (ret instanceof Configurable) {
+ ((Configurable) ret).setConf(conf);
+ }
+ return ret;
} catch (final ReflectiveOperationException e) {
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java b/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java
deleted file mode 100644
index 5f407af..0000000
--- a/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.api.common;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-import org.testng.annotations.Test;
-
-public class TestExponentialBackOffRetryHandler {
-
- @Test
- public void testExponentialBackOff() {
- FailureContext failures = new FailureContext();
- BackOffRetryHandler retryHandler = OperationRetryHandlerFactory.createExponentialBackOffHandler(10, 10000, 1000);
- assertFalse(retryHandler.hasExhaustedRetries(failures));
- assertTrue(retryHandler.canTryOpNow(failures));
-
- long now = System.currentTimeMillis();
- failures.updateFailure();
- assertFalse(retryHandler.hasExhaustedRetries(failures));
- assertFalse(retryHandler.canTryOpNow(failures));
- assertTrue(now + 500 < retryHandler.getOperationNextTime(failures));
- assertTrue(now + 15000 > retryHandler.getOperationNextTime(failures));
-
- for (int i = 0; i < 10; i++) {
- failures.updateFailure();
- }
- assertTrue(retryHandler.hasExhaustedRetries(failures));
- assertFalse(retryHandler.canTryOpNow(failures));
-
- failures.clear();
- assertFalse(retryHandler.hasExhaustedRetries(failures));
- assertTrue(retryHandler.canTryOpNow(failures));
- }
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
index 3c18ac7..2a2963f 100644
--- a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
+++ b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
@@ -58,7 +58,7 @@ public class MockDriver extends AbstractLensDriver {
/**
* The conf.
*/
- Configuration conf;
+ protected Configuration conf;
/**
* The query.
@@ -99,7 +99,7 @@ public class MockDriver extends AbstractLensDriver {
this.conf = conf;
ioTestVal = conf.getInt("mock.driver.test.val", -1);
this.conf.addResource(getDriverResourcePath("failing-query-driver-site.xml"));
- getQueryHook().setDriver(this);
+ loadQueryHook();
}
@Override
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java
index a530e9d..1560bf1 100644
--- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java
+++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java
@@ -24,10 +24,11 @@ import static org.testng.Assert.*;
import java.util.List;
import org.apache.lens.api.LensConf;
-import org.apache.lens.server.api.common.*;
import org.apache.lens.server.api.driver.LensDriver;
import org.apache.lens.server.api.driver.MockDriver;
import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.retry.BackOffRetryHandler;
+import org.apache.lens.server.api.retry.FibonacciExponentialBackOffRetryHandler;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java
new file mode 100644
index 0000000..cc58751
--- /dev/null
+++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.query.comparators;
+
+import static org.testng.Assert.*;
+
+import java.util.Comparator;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import lombok.Data;
+
+public class ChainedComparatorTest {
+ @Data
+ private static class Tuple {
+ final Integer a, b, c;
+ }
+
+ private Tuple tuple(Integer a, Integer b, Integer c) {
+ return new Tuple(a, b, c);
+ }
+
+ public static final ChainedComparator<Tuple> COMPARATOR = new ChainedComparator<>(Lists.newArrayList(
+ new Comparator<Tuple>() {
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ return o1.getA().compareTo(o2.getA());
+ }
+ },
+ new Comparator<Tuple>() {
+
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ return o1.getB().compareTo(o2.getB());
+ }
+ },
+ new Comparator<Tuple>() {
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ return o1.getC().compareTo(o2.getC());
+ }
+ }
+ ));
+
+ @DataProvider
+ public Object[][] comparisonData() {
+ return new Object[][]{
+ {tuple(0, 0, 0), tuple(0, 0, 0), 0},
+ {tuple(0, 0, 1), tuple(0, 0, 0), 1},
+ {tuple(0, 0, 1), tuple(0, 0, 4), -1},
+ {tuple(0, 0, 1), tuple(1, 0, 4), -1},
+ {tuple(0, 0, 1), tuple(0, -10, 4), 1},
+ };
+ }
+
+ @Test(dataProvider = "comparisonData")
+ public void testCompare(Tuple a, Tuple b, int expected) throws Exception {
+ assertEquals(COMPARATOR.compare(a, b), expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java
index 122409b..2667ebf 100644
--- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java
+++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java
@@ -24,7 +24,8 @@ import static org.apache.lens.server.api.LensServerAPITestUtil.getConfiguration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import java.util.HashSet;
import java.util.Set;
@@ -164,9 +165,13 @@ public class MaxConcurrentDriverQueriesConstraintTest {
when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver);
when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(currentDriverLaunchedQueries);
- boolean actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
+ String actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
- assertEquals(actualCanLaunch, expectedCanLaunch);
+ if (expectedCanLaunch) {
+ assertNull(actualCanLaunch);
+ } else {
+ assertNotNull(actualCanLaunch);
+ }
}
@Test(dataProvider = "dpTestConcurrentLaunches")
@@ -186,9 +191,13 @@ public class MaxConcurrentDriverQueriesConstraintTest {
when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(currentDriverLaunchedQueries);
when(mockLaunchedQueries.getQueries(mockDriver)).thenReturn(queries);
- boolean actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
+ String actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
- assertEquals(actualCanLaunch, expectedCanLaunch);
+ if (expectedCanLaunch) {
+ assertNull(actualCanLaunch);
+ } else {
+ assertNotNull(actualCanLaunch);
+ }
}
@Test(dataProvider = "dpTestPerQueueConstraints")
@@ -208,9 +217,13 @@ public class MaxConcurrentDriverQueriesConstraintTest {
QueryContext mockCandidateQuery = mock(QueryContext.class);
when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue);
when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver);
- boolean actualCanLaunch = perQueueConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
+ String actualCanLaunch = perQueueConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
- assertEquals(actualCanLaunch, expectedCanLaunch);
+ if (expectedCanLaunch) {
+ assertNull(actualCanLaunch);
+ } else {
+ assertNotNull(actualCanLaunch);
+ }
}
@Test(dataProvider = "dpTestPerPriorityConstraints")
@@ -230,9 +243,13 @@ public class MaxConcurrentDriverQueriesConstraintTest {
QueryContext mockCandidateQuery = mock(QueryContext.class);
when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority);
when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver);
- boolean actualCanLaunch = perPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
+ String actualCanLaunch = perPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
- assertEquals(actualCanLaunch, expectedCanLaunch);
+ if (expectedCanLaunch) {
+ assertNull(actualCanLaunch);
+ } else {
+ assertNotNull(actualCanLaunch);
+ }
}
@Test(dataProvider = "dpTestPerQueuePerPriorityConstraints")
@@ -254,8 +271,12 @@ public class MaxConcurrentDriverQueriesConstraintTest {
when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue);
when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority);
when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver);
- boolean actualCanLaunch = perQueueAndPerPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
+ String actualCanLaunch = perQueueAndPerPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries);
- assertEquals(actualCanLaunch, expectedCanLaunch);
+ if (expectedCanLaunch) {
+ assertNull(actualCanLaunch);
+ } else {
+ assertNotNull(actualCanLaunch);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java b/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java
new file mode 100644
index 0000000..26261dd
--- /dev/null
+++ b/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.api.retry;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.lens.server.api.query.StatusUpdateFailureContext;
+
+import org.testng.annotations.Test;
+
+public class TestExponentialBackOffRetryHandler {
+
+ @Test
+ public void testExponentialBackOff() {
+ StatusUpdateFailureContext failures = new StatusUpdateFailureContext();
+ BackOffRetryHandler<StatusUpdateFailureContext> retryHandler
+ = OperationRetryHandlerFactory.createExponentialBackOffHandler(10, 10000, 1000);
+ assertFalse(retryHandler.hasExhaustedRetries(failures));
+ assertTrue(retryHandler.canTryOpNow(failures));
+
+ long now = System.currentTimeMillis();
+ failures.updateFailure();
+ assertFalse(retryHandler.hasExhaustedRetries(failures));
+ assertFalse(retryHandler.canTryOpNow(failures));
+ assertTrue(now + 500 < retryHandler.getOperationNextTime(failures));
+ assertTrue(now + 15000 > retryHandler.getOperationNextTime(failures));
+
+ for (int i = 0; i < 10; i++) {
+ failures.updateFailure();
+ }
+ assertTrue(retryHandler.hasExhaustedRetries(failures));
+ assertFalse(retryHandler.canTryOpNow(failures));
+
+ failures.clear();
+ assertFalse(retryHandler.hasExhaustedRetries(failures));
+ assertTrue(retryHandler.canTryOpNow(failures));
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/pom.xml
----------------------------------------------------------------------
diff --git a/lens-server/pom.xml b/lens-server/pom.xml
index 6dea9a7..d24dc1e 100644
--- a/lens-server/pom.xml
+++ b/lens-server/pom.xml
@@ -372,7 +372,7 @@
<environmentVariables>
<MVN_CLASSPATH_FILE>${mvn.classpath.file}</MVN_CLASSPATH_FILE>
</environmentVariables>
- <argLine>-Xms256m -Xmx512m -XX:PermSize=256m -XX:MaxPermSize=256m</argLine>
+ <argLine>-Xms256m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=256m</argLine>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java
index b88c717..9f14396 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java
@@ -39,7 +39,7 @@ import org.apache.lens.server.api.health.HealthStatus;
import org.apache.lens.server.api.metastore.CubeMetastoreService;
import org.apache.lens.server.api.metrics.*;
import org.apache.lens.server.api.query.QueryExecutionService;
-import org.apache.lens.server.api.query.StatusChange;
+import org.apache.lens.server.api.query.events.StatusChange;
import org.apache.lens.server.api.session.*;
import org.apache.lens.server.healthcheck.LensServiceHealthCheck;
import org.apache.lens.server.query.QueryExecutionServiceImpl;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java
deleted file mode 100644
index 75c1146..0000000
--- a/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.query;
-
-import org.apache.lens.server.api.query.QueryContext;
-
-public class FIFOQueryComparator implements QueryComparator {
-
- @Override
- public int compare(QueryContext o1, QueryContext o2) {
-
- Long submitTimeO1 = o1.getSubmissionTime();
- Long submitTimeO2 = o2.getSubmissionTime();
-
- return submitTimeO1.compareTo(submitTimeO2);
- }
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
index a540c3c..3ab3aef 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
@@ -28,6 +28,7 @@ import java.util.List;
import javax.sql.DataSource;
import org.apache.lens.api.LensConf;
+import org.apache.lens.api.query.FailedAttempt;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.server.api.error.LensException;
@@ -35,15 +36,12 @@ import org.apache.lens.server.api.query.FinishedLensQuery;
import org.apache.lens.server.util.UtilityMethods;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.dbutils.BasicRowProcessor;
-import org.apache.commons.dbutils.BeanProcessor;
-import org.apache.commons.dbutils.QueryRunner;
-import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.dbutils.RowProcessor;
+import org.apache.commons.dbutils.*;
import org.apache.commons.dbutils.handlers.BeanHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
/**
@@ -93,7 +91,7 @@ public class LensServerDAO {
+ "metadata varchar(100000), " + "rows int, " + "filesize bigint, " + "errormessage varchar(10000), "
+ "driverstarttime bigint, " + "driverendtime bigint, " + "drivername varchar(10000), "
+ "queryname varchar(255), " + "submissiontime bigint, " + "driverquery varchar(1000000), "
- + "conf varchar(100000))";
+ + "conf varchar(100000), numfailedattempts int)";
try {
QueryRunner runner = new QueryRunner(ds);
runner.update(sql);
@@ -102,6 +100,18 @@ public class LensServerDAO {
log.warn("Unable to create finished queries table", e);
}
}
+ public void createFailedAttemptsTable() throws Exception {
+ String sql = "CREATE TABLE if not exists failed_attempts (handle varchar(255) not null,"
+ + "attempt_number int, drivername varchar(10000), progress float, progressmessage varchar(10000), "
+ + "errormessage varchar(10000), driverstarttime bigint, driverendtime bigint)";
+ try {
+ QueryRunner runner = new QueryRunner(ds);
+ runner.update(sql);
+ log.info("Created failed_attempts table");
+ } catch (SQLException e) {
+ log.error("Unable to create failed_attempts table", e);
+ }
+ }
/**
* DAO method to insert a new Finished query into Table.
@@ -115,14 +125,26 @@ public class LensServerDAO {
// The expected case
String sql = "insert into finished_queries (handle, userquery, submitter, priority, "
+ "starttime,endtime,result,status,metadata,rows,filesize,"
- + "errormessage,driverstarttime,driverendtime, drivername, queryname, submissiontime, driverquery, conf)"
- + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
- QueryRunner runner = new QueryRunner(ds);
- runner.update(sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getPriority(),
- query.getStartTime(), query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(),
- query.getRows(), query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(),
- query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime(),
- query.getDriverQuery(), serializeConf(query.getConf()));
+ + "errormessage,driverstarttime,driverendtime, drivername, queryname, submissiontime, driverquery, conf, "
+ + "numfailedattempts)"
+ + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ Connection conn = null;
+ try {
+ conn = getConnection();
+ conn.setAutoCommit(false);
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getPriority(),
+ query.getStartTime(), query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(),
+ query.getRows(), query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(),
+ query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime(),
+ query.getDriverQuery(), serializeConf(query.getConf()), query.getFailedAttempts().size());
+ for (int i = 0; i < query.getFailedAttempts().size(); i++) {
+ insertFailedAttempt(runner, conn, query.getHandle(), query.getFailedAttempts().get(i), i);
+ }
+ conn.commit();
+ } finally {
+ DbUtils.closeQuietly(conn);
+ }
} else {
log.warn("Re insert happening in purge: " + Thread.currentThread().getStackTrace());
if (alreadyExisting.equals(query)) {
@@ -135,6 +157,50 @@ public class LensServerDAO {
}
}
}
+ /**
+ * DAO method to insert a new Finished query into Table.
+ *
+ *
+ * @param runner
+ * @param conn
+ *@param handle to be inserted
+ * @param index @throws SQLException the exception
+ */
+ public void insertFailedAttempt(QueryRunner runner, Connection conn, String handle, FailedAttempt attempt, int index)
+ throws SQLException {
+ String sql = "insert into failed_attempts(handle, attempt_number, drivername, progress, progressmessage, "
+ + "errormessage, driverstarttime, driverendtime) values (?, ?, ?, ?, ?, ?, ?, ?)";
+ runner.update(conn, sql, handle, index, attempt.getDriverName(),
+ attempt.getProgress(), attempt.getProgressMessage(), attempt.getErrorMessage(),
+ attempt.getDriverStartTime(), attempt.getDriverFinishTime());
+ }
+
+ public void getFailedAttempts(final FinishedLensQuery query) {
+ if (query != null) {
+ String handle = query.getHandle();
+ ResultSetHandler<List<FailedAttempt>> rsh = new BeanHandler<List<FailedAttempt>>(null) {
+ @Override
+ public List<FailedAttempt> handle(ResultSet rs) throws SQLException {
+ List<FailedAttempt> attempts = Lists.newArrayList();
+ while (rs.next()) {
+ FailedAttempt attempt = new FailedAttempt(rs.getString(3), rs.getDouble(4), rs.getString(5),
+ rs.getString(6), rs.getLong(7), rs.getLong(8));
+ attempts.add(attempt);
+ }
+ return attempts;
+ }
+ };
+ String sql = "select * from failed_attempts where handle=? order by attempt_number";
+ QueryRunner runner = new QueryRunner(ds);
+ try {
+ query.setFailedAttempts(runner.query(sql, rsh, handle));
+ } catch (SQLException e) {
+ log.error("SQL exception while executing query.", e);
+ }
+ }
+ }
+
+
private String serializeConf(LensConf conf) {
return Base64.encodeBase64String(conf.toXMLString().getBytes(Charset.defaultCharset()));
@@ -157,7 +223,9 @@ public class LensServerDAO {
String sql = "select * from finished_queries where handle=?";
QueryRunner runner = new QueryRunner(ds);
try {
- return runner.query(sql, rsh, handle);
+ FinishedLensQuery finishedQuery = runner.query(sql, rsh, handle);
+ getFailedAttempts(finishedQuery);
+ return finishedQuery;
} catch (SQLException e) {
log.error("SQL exception while executing query.", e);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java
deleted file mode 100644
index 67dda6b..0000000
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.lens.server.query;
-
-import java.util.Comparator;
-
-import org.apache.lens.server.api.query.QueryContext;
-
-public interface QueryComparator extends Comparator<QueryContext> {
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java
deleted file mode 100644
index 2702581..0000000
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.query;
-
-import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.cost.QueryCost;
-
-public class QueryCostComparator extends FIFOQueryComparator {
-
- @Override
- public int compare(final QueryContext o1, final QueryContext o2) {
-
- QueryCost qcO1 = o1.getSelectedDriverQueryCost();
- QueryCost qcO2 = o2.getSelectedDriverQueryCost();
-
- int result = qcO1.compareTo(qcO2);
- if (result == 0) {
- return super.compare(o1, o2);
- }
- return result;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java
index 5d2ddbe..e932672 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
-import org.apache.lens.server.api.query.QueryEvent;
+import org.apache.lens.server.api.query.events.QueryEnded;
+import org.apache.lens.server.api.query.events.QueryEvent;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
index 91fddc9..2a34c68 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
@@ -40,7 +40,7 @@ import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.metrics.MetricsService;
import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
+import org.apache.lens.server.api.query.events.QueryEnded;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.commons.lang3.StringUtils;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java
index f264603..1760bec 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java
@@ -36,8 +36,8 @@ import org.apache.lens.api.util.MoxyJsonConfigurationContextResolver;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
-import org.apache.lens.server.api.query.QueryEvent;
+import org.apache.lens.server.api.query.events.QueryEnded;
+import org.apache.lens.server.api.query.events.QueryEvent;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index cb5961f..b5e996f 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -50,8 +50,6 @@ import org.apache.lens.server.BaseLensService;
import org.apache.lens.server.LensServerConf;
import org.apache.lens.server.LensServices;
import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.common.BackOffRetryHandler;
-import org.apache.lens.server.api.common.OperationRetryHandlerFactory;
import org.apache.lens.server.api.driver.*;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.error.LensMultiCauseException;
@@ -62,8 +60,11 @@ import org.apache.lens.server.api.metrics.MethodMetricsFactory;
import org.apache.lens.server.api.metrics.MetricsService;
import org.apache.lens.server.api.query.*;
import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy;
+import org.apache.lens.server.api.query.comparators.*;
import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
import org.apache.lens.server.api.query.cost.QueryCost;
+import org.apache.lens.server.api.query.events.*;
+import org.apache.lens.server.api.retry.*;
import org.apache.lens.server.api.util.LensUtil;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
@@ -235,7 +236,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
/**
* The query comparator
*/
- private QueryComparator queryComparator;
+ private Comparator<QueryContext> queryComparator;
/**
* The result sets.
*/
@@ -295,7 +296,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
* */
private final ReentrantLock removalFromLaunchedQueriesLock = new ReentrantLock();
- private final ExecutorService waitingQueriesSelectionSvc = Executors.newSingleThreadExecutor();
+ private final ScheduledExecutorService waitingQueriesSelectionSvc = Executors.newSingleThreadScheduledExecutor();
/**
* This is the TTL millis for all result sets of type {@link org.apache.lens.server.api.driver.InMemoryResultSet}
@@ -321,7 +322,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
private UserQueryToCubeQueryRewriter userQueryToCubeQueryRewriter;
// Exponential backoff retry handler for status updates
- private BackOffRetryHandler statusUpdateRetryHandler;
+ private BackOffRetryHandler<StatusUpdateFailureContext> statusUpdateRetryHandler;
+ private RetryPolicyDecider<QueryContext> queryRetryPolicyDecider;
/**
* Instantiates a new query execution service impl.
@@ -376,9 +378,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
*
* @throws LensException the lens exception
*/
- private void loadDriversAndSelector() throws LensException {
- //Load all configured Drivers
- loadDrivers();
+ private void loadDriverSelector() throws LensException {
//Load configured Driver Selector
try {
Class<? extends DriverSelector> driverSelectorClass = conf.getClass(DRIVER_SELECTOR_CLASS,
@@ -394,14 +394,18 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
}
private void loadQueryComparator() throws LensException {
try {
- Class<? extends QueryComparator> queryComparatorClass = conf.getClass(QUERY_COMPARATOR_CLASS,
- QueryPriorityComparator.class, QueryComparator.class);
- log.info("Using query comparator class: {}", queryComparatorClass.getCanonicalName());
- queryComparator = queryComparatorClass.newInstance();
+ Class<?>[] classes = conf.getClasses(QUERY_COMPARATOR_CLASSES,
+ MoreRetriesFirstComparator.class, QueryPriorityComparator.class,
+ FIFOQueryComparator.class, QueryCostComparator.class);
+ List<Comparator<QueryContext>> comparators = Lists.newArrayList();
+ for (Class<?> clazz: classes) {
+ comparators.add(clazz.asSubclass(QueryComparator.class).newInstance());
+ }
+ queryComparator = new ChainedComparator<>(comparators);
} catch (Exception e) {
- throw new LensException("Couldn't instantiate query comparator class. Class name: "
- + conf.get(QUERY_COMPARATOR_CLASS) + ". Please supply a valid value for "
- + QUERY_COMPARATOR_CLASS);
+ throw new LensException("Couldn't instantiate query comparator class. Classes: "
+ + conf.get(QUERY_COMPARATOR_CLASSES) + ". Please supply a valid value for "
+ + QUERY_COMPARATOR_CLASSES);
}
}
@@ -681,9 +685,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
Thread.sleep(100);
continue;
}
- QueryContext query = queuedQueries.take();
+ final QueryContext query = queuedQueries.take();
synchronized (query) {
-
/* Setting log segregation id */
logSegregationContext.setLogSegragationAndQueryId(query.getQueryHandleString());
@@ -714,6 +717,17 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
launched queries. First add to waiting queries, then release lock */
addToWaitingQueries(query);
removalFromLaunchedQueriesLock.unlock();
+ if (query.getRetryPolicy() != null) {
+ waitingQueriesSelectionSvc.schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (waitingQueries.remove(query)) {
+ queuedQueries.add(query);
+ }
+ }
+ }, query.getRetryPolicy().getOperationNextTime(query) - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
}
} finally {
if (removalFromLaunchedQueriesLock.isHeldByCurrentThread()) {
@@ -881,6 +895,43 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
log.info("StatusPoller exited");
}
}
+ private boolean handleRetries(QueryContext ctx) throws LensException {
+ // TODO: handle retries for post-processing, e.g. result formatting failure doesn't need query rerun
+ if (ctx.getStatus().failing()) {
+ if (removeFromLaunchedQueries(ctx)) {
+ processWaitingQueriesAsync(ctx);
+ }
+ if (ctx.getDriverStatus().failed() && !getDriverRetryPolicy(ctx).hasExhaustedRetries(ctx)) {
+ log.info("query {} will be retried on the same driver {}",
+ ctx.getQueryHandle(), ctx.getSelectedDriver().getFullyQualifiedName());
+ ctx.extractFailedAttempt();
+ ctx.setStatus(QueryStatus.getQueuedStatus());
+ ctx.getSelectedDriver().closeQuery(ctx.getQueryHandle());
+ return queuedQueries.add(ctx);
+ } else if (!getServerRetryPolicy(ctx).hasExhaustedRetries(ctx)) {
+ LensDriver selectedDriver = ctx.getSelectedDriver();
+ ctx.getDriverContext().blacklist(selectedDriver);
+ try (SessionContext ignored = new SessionContext(getSessionHandle(ctx.getLensSessionIdentifier()))) {
+ rewriteAndSelect(ctx);
+ } catch (LensException e) {
+ log.error("driver {} gave up on query {} and it will not be retried on any other driver since rewrite failed",
+ selectedDriver.getFullyQualifiedName(), e);
+ ctx.setStatus(new QueryStatus(1.0f, null, FAILED, ctx.getStatus().getStatusMessage(), false, null,
+ ctx.getStatus().getErrorMessage(), ctx.getStatus().getLensErrorTO()));
+ return false;
+ }
+ log.info("driver {} gave up on query {} and it will be retried on {}", selectedDriver.getFullyQualifiedName(),
+ ctx.getQueryHandle(), ctx.getSelectedDriver().getFullyQualifiedName());
+ ctx.extractFailedAttempt(selectedDriver);
+ ctx.setStatus(QueryStatus.getQueuedStatus());
+ selectedDriver.closeQuery(ctx.getQueryHandle());
+ return queuedQueries.add(ctx);
+ }
+ ctx.setStatus(new QueryStatus(1.0f, null, FAILED, ctx.getStatus().getStatusMessage(), false, null,
+ ctx.getStatus().getErrorMessage(), ctx.getStatus().getLensErrorTO()));
+ }
+ return false;
+ }
/**
* Sets the failed status.
@@ -891,13 +942,33 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
* @throws LensException the lens exception
*/
void setFailedStatus(QueryContext ctx, String statusMsg, Exception e) throws LensException {
-
QueryStatus before = ctx.getStatus();
- ctx.setStatus(new QueryStatus(0.0f, null, FAILED, statusMsg, false, null, LensUtil.getCauseMessage(e),
+ ctx.setStatus(new QueryStatus(0.0f, null, FAILING, statusMsg, false, null, LensUtil.getCauseMessage(e),
e instanceof LensException ? ((LensException)e).buildLensErrorTO(this.errorCollection) : null));
- updateFinishedQuery(ctx, before);
+ handleRetries(ctx);
+ if (ctx.finished()) {
+ updateFinishedQuery(ctx, before);
+ }
fireStatusChangeEvent(ctx, ctx.getStatus(), before);
}
+
+ private BackOffRetryHandler<QueryContext> getServerRetryPolicy(QueryContext ctx) {
+ if (ctx.getServerRetryPolicy() == null) {
+ // allow new driver to retry
+ ctx.setDriverRetryPolicy(null);
+ ctx.setServerRetryPolicy(queryRetryPolicyDecider.decidePolicy(ctx.getStatus().getErrorMessage()));
+ }
+ return ctx.getServerRetryPolicy();
+ }
+
+ private BackOffRetryHandler<QueryContext> getDriverRetryPolicy(QueryContext ctx) {
+ if (ctx.getDriverRetryPolicy() == null) {
+ ctx.setDriverRetryPolicy(ctx.getSelectedDriver().getRetryPolicyDecider()
+ .decidePolicy(ctx.getDriverStatus().getErrorMessage()));
+ }
+ return ctx.getDriverRetryPolicy();
+ }
+
/**
* Sets the cancelled status.
*
@@ -984,6 +1055,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
|| !ctx.isResultAvailableInDriver())) {
setSuccessState(ctx);
} else {
+ if (ctx.getStatus().failing()) {
+ handleRetries(ctx);
+ }
if (ctx.getStatus().finished()) {
updateFinishedQuery(ctx, before);
}
@@ -1029,7 +1103,11 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
case LAUNCHED:
return new QueryLaunched(ctx.getLaunchTime(), prevState, currState, query);
case QUEUED:
- return new QueryQueued(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser());
+ if (ctx.getFailedAttempts().size() > 0) {
+ return new QueryQueuedForRetry(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser());
+ } else {
+ return new QueryQueued(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser());
+ }
case RUNNING:
return new QueryRunning(System.currentTimeMillis() - ctx.getDriverStatus().getDriverStartTime(), prevState,
currState, query);
@@ -1239,13 +1317,21 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
throw new IllegalStateException("Could not load phase 1 rewriters");
}
try {
+ loadQueryRetryPolicyDecider(conf);
+ } catch (LensException e) {
+ throw new IllegalStateException("Could not load retry policy", e);
+ }
+ try {
initializeQueryAcceptors();
} catch (LensException e) {
throw new IllegalStateException("Could not load acceptors");
}
initializeListeners();
try {
- loadDriversAndSelector();
+ // Load all configured Drivers
+ loadDrivers();
+ // load driver selector
+ loadDriverSelector();
} catch (LensException e) {
log.error("Error while loading drivers", e);
throw new IllegalStateException("Could not load drivers", e);
@@ -1269,6 +1355,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
log.info("Query execution service initialized");
}
+ private void loadQueryRetryPolicyDecider(Configuration conf) throws LensException {
+ this.queryRetryPolicyDecider = ChainedRetryPolicyDecider.from(conf, QUERY_RETRY_POLICY_CLASSES);
+ }
+
/**
* Initalize finished query store.
*
@@ -1279,8 +1369,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
this.lensServerDao.init(conf);
try {
this.lensServerDao.createFinishedQueriesTable();
+ this.lensServerDao.createFailedAttemptsTable();
} catch (Exception e) {
- log.warn("Unable to create finished query table, query purger will not purge queries", e);
+ log.warn("Unable to create finished query tables, query purger will not purge queries", e);
}
}
@@ -1429,15 +1520,16 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
log.info("Recovered {} queries", allQueries.size());
}
super.start();
- querySubmitter.start();
- statusPoller.start();
- queryPurger.start();
- prepareQueryPurger.start();
startEstimatePool();
startLauncherPool();
startQueryCancellationPool();
+ querySubmitter.start();
+ statusPoller.start();
+ queryPurger.start();
+ prepareQueryPurger.start();
+
if (conf.getBoolean(RESULTSET_PURGE_ENABLED, DEFAULT_RESULTSET_PURGE_ENABLED)) {
queryResultPurger = new QueryResultPurger();
queryResultPurger.init(conf);
@@ -1598,8 +1690,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
// Evaluate success of rewrite and estimate
boolean succeededOnce = false;
- List<String> failureCauses = new ArrayList<String>(numDrivers);
- List<LensException> causes = new ArrayList<LensException>(numDrivers);
+ List<String> failureCauses = new ArrayList<>(numDrivers);
+ List<LensException> causes = new ArrayList<>(numDrivers);
for (RewriteEstimateRunnable r : runnables) {
if (r.isSucceeded()) {
@@ -2087,7 +2179,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
private QueryHandle submitQuery(final QueryContext ctx) throws LensException {
synchronized (ctx) {
QueryStatus before = ctx.getStatus();
- ctx.setStatus(new QueryStatus(0.0, null, QUEUED, "Query is queued", false, null, null, null));
+ ctx.setStatus(QueryStatus.getQueuedStatus());
queuedQueries.add(ctx);
log.info("Added to Queued Queries:{}", ctx.getQueryHandleString());
allQueries.put(ctx.getQueryHandle(), ctx);
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
index 55cabe2..557daa2 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
@@ -24,7 +24,7 @@ import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
+import org.apache.lens.server.api.query.events.QueryEnded;
import org.apache.lens.server.stats.event.query.QueryDriverStatistics;
import org.apache.lens.server.stats.event.query.QueryExecutionStatistics;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java
deleted file mode 100644
index 2c6d904..0000000
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.query;
-
-import org.apache.lens.api.Priority;
-import org.apache.lens.server.api.query.QueryContext;
-
-public class QueryPriorityComparator extends FIFOQueryComparator {
-
- @Override
- public int compare(final QueryContext o1, final QueryContext o2) {
-
- Priority pO1 = o1.getPriority();
- Priority pO2 = o2.getPriority();
-
- int result = pO1.compareTo(pO2);
- if (result == 0) {
- return super.compare(o1, o2);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
index 41cf33b..c7dc0e1 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
@@ -28,6 +28,7 @@ import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.metrics.MetricsService;
import org.apache.lens.server.api.query.*;
+import org.apache.lens.server.api.query.events.QueryExecuted;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.hadoop.fs.FileSystem;
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java
index 48291b9..cf117dc 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java
@@ -19,11 +19,13 @@
package org.apache.lens.server.query.constraint;
+import java.util.Collections;
import java.util.Set;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection;
import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
+import org.apache.lens.server.api.retry.BackOffRetryHandler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@@ -44,7 +46,7 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo
private final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints;
public DefaultQueryLaunchingConstraintsChecker(
- @NonNull final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints) {
+ @NonNull final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints) {
this.lensQueryConstraints = lensQueryConstraints;
}
@@ -54,8 +56,11 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo
Set<QueryLaunchingConstraint> allConstraints = prepareAllConstraints(candidateQuery);
for (QueryLaunchingConstraint queryConstraint : allConstraints) {
- if (!queryConstraint.allowsLaunchOf(candidateQuery, launchedQueries)) {
- log.info("query {} not allowed to launch. Constraint failed: {}", candidateQuery, queryConstraint);
+ String launchRejectionMessage = queryConstraint.allowsLaunchOf(candidateQuery, launchedQueries);
+ if (launchRejectionMessage != null) {
+ log.info("query {} not allowed to launch. Constraint failed: {} with message: {}",
+ candidateQuery, queryConstraint, launchRejectionMessage);
+ candidateQuery.getStatus().setProgressMessage(launchRejectionMessage);
return false;
}
}
@@ -66,6 +71,12 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo
Set<QueryLaunchingConstraint> prepareAllConstraints(final QueryContext candidateQuery) {
ImmutableSet<QueryLaunchingConstraint> driverConstraints = candidateQuery.getSelectedDriverQueryConstraints();
- return Sets.union(this.lensQueryConstraints, driverConstraints);
+ BackOffRetryHandler<QueryContext> retryPolicy = candidateQuery.getRetryPolicy();
+ Sets.SetView<QueryLaunchingConstraint> constraints = Sets.union(this.lensQueryConstraints, driverConstraints);
+ if (retryPolicy == null) {
+ return constraints;
+ } else {
+ return Sets.union(Collections.singleton(new RetryPolicyToConstraingAdapter(retryPolicy)), constraints);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java
new file mode 100644
index 0000000..e0d6d80
--- /dev/null
+++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.query.constraint;
+
+
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection;
+import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
+import org.apache.lens.server.api.retry.BackOffRetryHandler;
+
+import lombok.Data;
+
+@Data
+public class RetryPolicyToConstraingAdapter implements QueryLaunchingConstraint {
+ private final BackOffRetryHandler<QueryContext> constraint;
+ @Override
+ public String allowsLaunchOf(QueryContext candidateQuery, EstimatedImmutableQueryCollection launchedQueries) {
+ if (!constraint.canTryOpNow(candidateQuery)) {
+ return "Query will be automatically re-attempted in "
+ + (constraint.getOperationNextTime(candidateQuery) - System.currentTimeMillis())/1000 + " seconds";
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java
index 0a8d4c3..a7ee737 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java
@@ -57,18 +57,20 @@ public class TotalQueryCostCeilingConstraint implements QueryLaunchingConstraint
* @return
*/
@Override
- public boolean allowsLaunchOf(
+ public String allowsLaunchOf(
final QueryContext candidateQuery, final EstimatedImmutableQueryCollection launchedQueries) {
if (!totalQueryCostCeilingPerUser.isPresent()) {
- return true;
+ return null;
}
final String currentUser = candidateQuery.getSubmittedUser();
QueryCost totalQueryCostForCurrentUser = launchedQueries.getTotalQueryCost(currentUser);
- boolean canLaunch = (totalQueryCostForCurrentUser.compareTo(totalQueryCostCeilingPerUser.get()) <= 0);
- log.debug("canLaunch:{}", canLaunch);
- return canLaunch;
+ if (totalQueryCostForCurrentUser.compareTo(totalQueryCostCeilingPerUser.get()) > 0) {
+ return totalQueryCostForCurrentUser + "/" + totalQueryCostCeilingPerUser + " capacity utilized by "
+ + candidateQuery.getSubmittedUser();
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java b/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java
index 18c2f2c..b2e140b 100644
--- a/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java
+++ b/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java
@@ -299,7 +299,7 @@ public final class RewriteUtil {
Map<LensDriver, DriverRewriterRunnable> runnables = new LinkedHashMap<>();
List<RewriteUtil.CubeQueryInfo> cubeQueries = findCubePositions(replacedQuery, ctx.getHiveConf());
- for (LensDriver driver : ctx.getDriverContext().getDrivers()) {
+ for (LensDriver driver : ctx.getDriverContext().getEligibleDrivers()) {
runnables.put(driver, new DriverRewriterRunnable(driver, ctx, cubeQueries, replacedQuery));
}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
index 4192134..1c642bd 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
@@ -25,7 +25,7 @@ import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.api.scheduler.*;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
+import org.apache.lens.server.api.query.events.QueryEnded;
import lombok.extern.slf4j.Slf4j;