You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/03 14:23:44 UTC
[07/35] brooklyn-server git commit: remove soft refs in tasks in
favour of lookup
remove soft refs in tasks in favour of lookup
soft refs are expensive in the GC phase -- improves speed by 20% in some cases
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/9a105e05
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/9a105e05
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/9a105e05
Branch: refs/heads/master
Commit: 9a105e05dbe0d8019a7b4f3c3abd523e66a24a84
Parents: 784bc96
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 11 18:03:37 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Sep 15 10:29:08 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/brooklyn/api/mgmt/Task.java | 2 ++
.../util/core/task/BasicExecutionManager.java | 36 ++++++++++++++++++--
.../brooklyn/util/core/task/BasicTask.java | 31 ++++++++++-------
.../brooklyn/util/core/task/ForwardingTask.java | 11 ++++++
.../brooklyn/util/core/task/TaskInternal.java | 2 ++
5 files changed, 68 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9a105e05/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
index ce7a96d..969c3a8 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
@@ -55,6 +55,8 @@ public interface Task<T> extends ListenableFuture<T>, TaskAdaptable<T> {
/** task which submitted this task, if was submitted by a task */
public Task<?> getSubmittedByTask();
+ /** task which submitted this task, if was submitted by a task */
+ public String getSubmittedByTaskId();
/** The thread where the task is running, if it is running. */
public Thread getThread();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9a105e05/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index aa61a57..afc8c99 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -57,6 +57,7 @@ import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
@@ -69,8 +70,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -729,7 +732,9 @@ public class BasicExecutionManager implements ExecutionManager {
incompleteTaskIds.add(task.getId());
Task<?> currentTask = Tasks.current();
- if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask);
+ if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(
+ // do this instead of soft reference (2017-09) as soft refs impact GC
+ Maybe.of(new TaskLookup(this, currentTask)) );
((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
if (flags!=null && flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
@@ -742,6 +747,33 @@ public class BasicExecutionManager implements ExecutionManager {
tasksById.put(task.getId(), task);
totalTaskCount.incrementAndGet();
}
+
+ private static class TaskLookup implements Supplier<Task<?>> {
+ // this class is not meant to be serialized, but if it is, make sure exec mgr doesn't sneak in
+ transient BasicExecutionManager mgr;
+
+ String id;
+ String displayName;
+ public TaskLookup(BasicExecutionManager mgr, Task<?> t) {
+ this.mgr = mgr;
+ id = t.getId();
+ displayName = t.getDisplayName();
+ }
+ @Override
+ public Task<?> get() {
+ if (mgr==null) return gone();
+ Task<?> result = mgr.getTask(id);
+ if (result!=null) return result;
+ return gone();
+ }
+ private <T> Task<T> gone() {
+ Task<T> t = Tasks.<T>builder().dynamic(false).displayName(displayName)
+ .description("Details of the original task "+id+" have been forgotten.")
+ .body(Callables.returning((T)null)).build();
+ ((BasicTask<T>)t).ignoreIfNotRun();
+ return t;
+ }
+ }
protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
internalBeforeStart(flags, task, true);
@@ -856,7 +888,7 @@ public class BasicExecutionManager implements ExecutionManager {
if (startedInThisThread) {
PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
//clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
- if (RENAME_THREADS && startedInThisThread) {
+ if (RENAME_THREADS) {
Thread thread = task.getThread();
if (thread==null) {
log.warn("BasicTask.afterEnd invoked without corresponding beforeStart");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9a105e05/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index 4398675..f1a066f 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -43,10 +43,12 @@ import java.util.concurrent.TimeoutException;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.objs.Identifiable;
import org.apache.brooklyn.util.JavaGroovyEquivalents;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.guava.Maybe.MaybeSupplier;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
@@ -57,10 +59,10 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Objects;
+import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
@@ -256,6 +258,17 @@ public class BasicTask<T> implements TaskInternal<T> {
if (submittedByTask==null) return null;
return submittedByTask.orNull();
}
+ @Override
+ public String getSubmittedByTaskId() {
+ if (submittedByTask==null || submittedByTask.isAbsent()) return null;
+ if (submittedByTask instanceof MaybeSupplier) {
+ Supplier<?> supplier = ((MaybeSupplier<?>)submittedByTask).getSupplier();
+ if (supplier instanceof Identifiable) {
+ return ((Identifiable)supplier).getId();
+ }
+ }
+ return submittedByTask.get().getId();
+ }
/** the thread where the task is running, if it is running */
@Override
@@ -906,19 +919,13 @@ public class BasicTask<T> implements TaskInternal<T> {
submitTimeUtc = val;
}
- private static <T> Task<T> newGoneTaskFor(Task<?> task) {
- Task<T> t = Tasks.<T>builder().dynamic(false).displayName(task.getDisplayName())
- .description("Details of the original task "+task+" have been forgotten.")
- .body(Callables.returning((T)null)).build();
- ((BasicTask<T>)t).ignoreIfNotRun();
- return t;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void setSubmittedByTask(Task<?> task) {
- // possible optimization, store details but don't create task until later
- submittedByTask = Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task)));
+ setSubmittedByTask(Maybe.of(task));
+ }
+ @Override
+ public void setSubmittedByTask(Maybe<Task<?>> taskM) {
+ submittedByTask = taskM;
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9a105e05/core/src/main/java/org/apache/brooklyn/util/core/task/ForwardingTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ForwardingTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ForwardingTask.java
index 7233ea8..83f2128 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ForwardingTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ForwardingTask.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
import com.google.common.base.Function;
@@ -118,6 +119,11 @@ public abstract class ForwardingTask<T> extends ForwardingObject implements Task
}
@Override
+ public String getSubmittedByTaskId() {
+ return delegate().getSubmittedByTaskId();
+ }
+
+ @Override
public Thread getThread() {
return delegate().getThread();
}
@@ -303,6 +309,11 @@ public abstract class ForwardingTask<T> extends ForwardingObject implements Task
}
@Override
+ public void setSubmittedByTask(Maybe<Task<?>> taskM) {
+ delegate().setSubmittedByTask(taskM);
+ }
+
+ @Override
public Set<Object> getMutableTags() {
return delegate().getMutableTags();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9a105e05/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
index 83b9eab..6b3365b 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
import com.google.common.annotations.Beta;
@@ -113,6 +114,7 @@ public interface TaskInternal<T> extends Task<T> {
void setSubmitTimeUtc(long currentTimeMillis);
void setSubmittedByTask(Task<?> task);
+ void setSubmittedByTask(Maybe<Task<?>> task);
Set<Object> getMutableTags();