You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/04/09 20:41:36 UTC
[incubator-heron] 01/01: Merge branch 'master' of
https://github.com/apache/incubator-heron into huijunw/healthmgrmetrics
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch huijunw/healthmgrmetrics
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 92c233460d9fc73df4ecc5ed90e9b4c51b3c7e76
Merge: 2e77607 cc4b2d7
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Mon Apr 9 13:41:09 2018 -0700
Merge branch 'master' of https://github.com/apache/incubator-heron into huijunw/healthmgrmetrics
.gitignore | 3 +
.gitmodules | 2 +-
README.md | 5 +-
WORKSPACE | 89 +-
.../src/java/BUILD | 2 +-
.../twitter/heron/examples/eco/EvenAndOddBolt.java | 14 +-
.../twitter/heron/examples/eco/LogInfoBolt.java | 8 +-
.../twitter/heron/examples/eco/RandomString.java | 51 +
.../heron/examples/eco/StatefulConsumerBolt.java | 66 +
.../heron/examples/eco/StatefulNumberSpout.java | 83 +
.../heron/examples/eco/StatefulRandomIntSpout.java | 73 +
.../heron/examples/eco/StatefulWindowSumBolt.java | 67 +
.../heron/examples/eco/TestFibonacciSpout.java | 15 +-
.../heron/examples/eco/TestIBasicPrintBolt.java | 14 +-
.../heron/examples/eco/TestNameCounter.java | 12 +-
.../twitter/heron/examples/eco/TestNameSpout.java | 18 +-
.../twitter/heron/examples/eco/TestPrintBolt.java | 8 +-
.../heron/examples/eco/TestPropertyHolder.java | 0
.../com/twitter/heron/examples/eco/TestUnits.java | 0
.../twitter/heron/examples/eco/TestWindowBolt.java | 14 +-
.../com/twitter/heron/examples/eco/WordSpout.java | 64 +
.../examples/eco/heron-stateful-windowing.yaml | 43 +-
.../examples/eco/heron-stateful-word-count.yaml | 38 +
.../heron/examples/eco/heron_fibonacci.yaml | 5 +-
.../heron/examples/eco/heron_windowing.yaml | 9 +-
.../heron/examples/eco/heron_wordcount.yaml | 5 +-
.../twitter/heron/examples/eco/sample.properties | 0
.../src/java/BUILD | 10 +-
.../twitter/heron/examples/eco/EvenAndOddBolt.java | 0
.../twitter/heron/examples/eco/LogInfoBolt.java | 0
.../heron/examples/eco/TestFibonacciSpout.java | 0
.../heron/examples/eco/TestIBasicPrintBolt.java | 0
.../heron/examples/eco/TestNameCounter.java | 0
.../twitter/heron/examples/eco/TestNameSpout.java | 0
.../twitter/heron/examples/eco/TestPrintBolt.java | 0
.../heron/examples/eco/TestPropertyHolder.java | 0
.../com/twitter/heron/examples/eco/TestUnits.java | 0
.../twitter/heron/examples/eco/TestWindowBolt.java | 0
.../com/twitter/heron/examples/eco/fibonacci.yaml | 0
.../twitter/heron/examples/eco/sample.properties | 0
.../heron/examples/eco/simple_windowing.yaml | 0
.../heron/examples/eco/simple_wordcount.yaml | 0
eco/src/java/BUILD | 53 +-
eco/src/java/com/twitter/heron/eco/Eco.java | 87 +-
.../com/twitter/heron/eco/builder/BoltBuilder.java | 2 +-
.../heron/eco/builder/ComponentBuilder.java | 2 +-
.../twitter/heron/eco/builder/ConfigBuilder.java | 7 +-
.../heron/eco/builder/{ => heron}/EcoBuilder.java | 11 +-
.../eco/builder/{ => heron}/SpoutBuilder.java | 8 +-
.../eco/builder/{ => heron}/StreamBuilder.java | 20 +-
.../heron/eco/builder/{ => storm}/EcoBuilder.java | 7 +-
.../eco/builder/{ => storm}/SpoutBuilder.java | 4 +-
.../eco/builder/{ => storm}/StreamBuilder.java | 4 +-
.../eco/definition/EcoTopologyDefinition.java | 18 +-
.../heron/eco/definition/ObjectDefinition.java | 2 +-
.../com/twitter/heron/eco/submit/EcoSubmitter.java | 18 +-
eco/tests/java/BUILD | 39 +-
eco/tests/java/com/twitter/heron/eco/EcoTest.java | 48 +-
.../HeronEcoBuilderTest.java} | 12 +-
.../HeronSpoutBuilderTest.java} | 16 +-
.../HeronStreamBuilderTest.java} | 46 +-
.../StormEcoBuilderTest.java} | 9 +-
.../StormSpoutBuilderTest.java} | 6 +-
.../StormStreamBuilderTest.java} | 6 +-
.../twitter/heron/eco/parser/EcoParserTest.java | 2 +
.../twitter/heron/eco/submit/EcoSubmitterTest.java | 24 +-
.../scala/ScalaClassicalMusicTopology.scala | 105 +
.../scala/ScalaIntegerProcessingTopology.scala | 21 +-
.../streamlet/scala/ScalaRepartitionTopology.scala | 82 +
.../scala/ScalaTransformsAndCloneTopology.scala | 105 +
.../scala/ScalaWindowedWordCountTopology.scala | 80 +
.../scala/common/ScalaTopologyExampleUtils.scala | 29 +-
.../twitter/heron/streamlet/impl/BuilderImpl.java | 6 +-
.../streamlet/impl/operators/JoinOperator.java | 2 +-
.../streamlet/scala/SerializableTransformer.scala | 2 -
.../heron/streamlet/scala/impl/BuilderImpl.scala | 3 +
heron/common/src/cpp/basics/BUILD | 6 +-
.../cpp/config/heron-internals-config-reader.cpp | 5 +
.../src/cpp/config/heron-internals-config-reader.h | 3 +
.../src/cpp/config/heron-internals-config-vars.cpp | 2 +
.../src/cpp/config/heron-internals-config-vars.h | 3 +
.../src/cpp/config/topology-config-helper.cpp | 106 +-
.../common/src/cpp/config/topology-config-helper.h | 31 +-
heron/common/src/cpp/metrics/BUILD | 7 +-
heron/common/src/cpp/network/BUILD | 2 +-
heron/common/src/cpp/network/baseconnection.cpp | 2 +-
heron/common/src/cpp/zookeeper/BUILD | 2 +-
.../heron/common/basics/DryRunFormatType.java | 3 +-
.../twitter/heron/common/network/HeronClient.java | 2 +-
heron/common/tests/cpp/config/BUILD | 18 +
.../cpp/config/topology-config-helper_unittest.cpp | 242 ++
.../src/yaml/conf/aurora/heron_internals.yaml | 2 +-
.../src/yaml/conf/examples/heron_internals.yaml | 2 +-
.../src/yaml/conf/kubernetes/heron_internals.yaml | 2 +-
.../src/yaml/conf/local/heron_internals.yaml | 2 +-
.../src/yaml/conf/localzk/heron_internals.yaml | 2 +-
.../src/yaml/conf/marathon/heron_internals.yaml | 2 +-
.../src/yaml/conf/mesos/heron_internals.yaml | 2 +-
.../src/yaml/conf/nomad/heron_internals.yaml | 2 +-
heron/config/src/yaml/conf/nomad/heron_nomad.sh | 3 +
.../config/src/yaml/conf/nomad/metrics_sinks.yaml | 3 +-
heron/config/src/yaml/conf/nomad/scheduler.yaml | 17 +
.../src/yaml/conf/sandbox/heron_internals.yaml | 2 +-
.../src/yaml/conf/slurm/heron_internals.yaml | 2 +-
.../src/yaml/conf/standalone/heron_internals.yaml | 2 +-
.../config/src/yaml/conf/standalone/heron_nomad.sh | 3 +
.../src/yaml/conf/standalone/metrics_sinks.yaml | 3 +-
.../config/src/yaml/conf/standalone/scheduler.yaml | 14 +
.../standalone/templates/scheduler.template.yaml | 14 +
.../config/src/yaml/conf/yarn/heron_internals.yaml | 2 +-
.../twitter/heron/downloader/FileDownloader.java | 2 +-
heron/executor/src/python/heron_executor.py | 3 +-
.../tests/python/heron_executor_unittest.py | 2 +-
heron/healthmgr/src/java/BUILD | 3 +
.../healthmgr/common/ComponentMetricsHelper.java | 111 -
.../healthmgr/common/HealthManagerEvents.java | 13 +-
.../healthmgr/common/PhysicalPlanProvider.java | 6 +-
.../healthmgr/detectors/BackPressureDetector.java | 59 +-
.../heron/healthmgr/detectors/BaseDetector.java | 17 +-
.../detectors/GrowingWaitQueueDetector.java | 79 +-
.../detectors/LargeWaitQueueDetector.java | 64 +-
.../detectors/ProcessingRateSkewDetector.java | 11 +-
.../heron/healthmgr/detectors/SkewDetector.java | 101 +-
...ityDetector.java => WaitQueueSkewDetector.java} | 15 +-
.../heron/healthmgr/diagnosers/BaseDiagnoser.java | 60 +-
.../healthmgr/diagnosers/DataSkewDiagnoser.java | 95 +-
.../diagnosers/SlowInstanceDiagnoser.java | 95 +-
.../diagnosers/UnderProvisioningDiagnoser.java | 66 +-
.../AutoRestartBackpressureContainerPolicy.java | 52 +-
.../policy/DynamicResourceAllocationPolicy.java | 46 +-
.../resolvers/RestartContainerResolver.java | 105 +-
.../heron/healthmgr/resolvers/ScaleUpResolver.java | 122 +-
.../healthmgr/sensors/BackPressureSensor.java | 78 +-
.../heron/healthmgr/sensors/BaseSensor.java | 20 +-
.../heron/healthmgr/sensors/BufferSizeSensor.java | 90 +-
.../healthmgr/sensors/ExecuteCountSensor.java | 22 +-
.../sensors/MetricsCacheMetricsProvider.java | 86 +-
.../healthmgr/sensors/TrackerMetricsProvider.java | 73 +-
.../com/twitter/heron/healthmgr/TestUtils.java | 69 -
.../common/ComponentMetricsHelperTest.java | 70 -
.../healthmgr/common/PackingPlanProviderTest.java | 4 +-
.../healthmgr/common/TopologyProviderTest.java | 2 +-
.../detectors/BackPressureDetectorTest.java | 73 +-
.../detectors/GrowingWaitQueueDetectorTest.java | 114 +-
.../detectors/LargeWaitQueueDetectorTest.java | 56 +-
.../detectors/ProcessingRateSkewDetectorTest.java | 203 +-
.../detectors/WaitQueueDisparityDetectorTest.java | 71 -
.../detectors/WaitQueueSkewDetectorTest.java | 88 +
.../diagnosers/DataSkewDiagnoserTest.java | 124 +-
.../diagnosers/SlowInstanceDiagnoserTest.java | 98 +-
.../diagnosers/UnderProvisioningDiagnoserTest.java | 82 +-
.../healthmgr/resolvers/ScaleUpResolverTest.java | 87 +-
.../healthmgr/sensors/BackPressureSensorTest.java | 55 +-
.../healthmgr/sensors/BufferSizeSensorTest.java | 55 +-
.../healthmgr/sensors/ExecuteCountSensorTest.java | 70 +-
.../sensors/MetricsCacheMetricsProviderTest.java | 162 +-
.../sensors/TrackerMetricsProviderTest.java | 153 +-
.../src/cpp/slave/outgoing-tuple-collection.h | 2 +-
.../instance/src/cpp/spoutimpl/spout-instance.cpp | 8 +-
heron/instance/src/cpp/spoutimpl/spout-instance.h | 2 +
.../cpp/spoutimpl/spout-output-collector-impl.h | 1 +
.../heron/instance/AbstractOutputCollector.java | 7 +
.../com/twitter/heron/instance/HeronInstance.java | 2 +-
.../heron/instance/spout/SpoutInstance.java | 8 +
.../java/com/twitter/heron/resource/Constants.java | 2 +-
.../heron/metricsmgr/sink/AbstractWebSink.java | 7 +-
.../sink/metricscache/MetricsCacheSinkTest.java | 3 +-
.../metricsmgr/sink/tmaster/TMasterSinkTest.java | 3 +-
.../roundrobin/ResourceCompliantRRPacking.java | 4 +-
heron/scheduler-core/src/java/BUILD | 15 +-
.../heron/scheduler/RuntimeManagerMain.java | 8 +
.../heron/scheduler/dryrun/JsonFormatterUtils.java | 87 +
.../scheduler/dryrun/SubmitJsonDryRunRenderer.java | 44 +
.../scheduler/dryrun/UpdateJsonDryRunRenderer.java | 57 +
.../heron/scheduler/utils/DryRunRenders.java | 7 +
heron/scheduler-core/tests/java/BUILD | 71 +-
.../scheduler/dryrun/JsonFormatterUtilsTest.java | 70 +
.../resources/JsonFormatterUtilsExpectedJson.txt | 1 +
heron/schedulers/src/java/BUILD | 1 +
.../heron/scheduler/nomad/NomadConstants.java | 5 +
.../heron/scheduler/nomad/NomadContext.java | 40 +-
.../heron/scheduler/nomad/NomadScheduler.java | 100 +-
.../heron/scheduler/nomad/NomadSchedulerTest.java | 113 +-
.../java/com/twitter/heron/spi/common/Context.java | 4 +
.../src/java/com/twitter/heron/spi/common/Key.java | 4 +
.../twitter/heron/spi/common/ConfigLoaderTest.java | 2 +-
heron/stmgr/src/cpp/manager/instance-server.cpp | 2 +
heron/tmaster/src/cpp/manager/tcontroller.cpp | 217 +-
heron/tmaster/src/cpp/manager/tcontroller.h | 35 +
heron/tmaster/src/cpp/manager/tmaster.cpp | 109 +-
heron/tmaster/src/cpp/manager/tmaster.h | 40 +-
heron/tmaster/tests/cpp/server/BUILD | 23 +
.../tests/cpp/server/tcontroller_unittest.cpp | 48 +
.../tmaster/tests/cpp/server/tmaster_unittest.cpp | 293 +-
.../twitter/heron/apiserver/utils/ConfigUtils.java | 10 +-
heron/tools/apiserver/src/shell/heron-apiserver.sh | 8 +-
heron/tools/cli/src/python/args.py | 2 +-
heron/tools/cli/src/python/update.py | 6 +-
heron/uploaders/src/java/BUILD | 4 +-
.../twitter/heron/uploader/http/HttpUploader.java | 66 +-
heron/uploaders/tests/java/BUILD | 2 +-
.../heron/uploader/http/HttpUploaderTest.java | 97 +
.../localfs/LocalFileSystemConstantsTest.java | 2 +-
integration_test/src/python/test_runner/main.py | 8 +-
.../src/python/test_runner/resources/test.json | 7 +
integration_test/src/scala/BUILD | 28 +
.../common/ScalaIntegrationTestBase.scala | 32 +
.../ScalaStreamletWithFilterAndTransform.scala | 71 +
...calaStreamletWithFilterAndTransformResults.json | 1 +
scripts/applatix/javatests.sh | 19 +-
scripts/applatix/test.sh | 19 +-
scripts/get_all_heron_paths.sh | 4 +-
scripts/packages/BUILD | 11 +
scripts/release/status.sh | 2 +-
scripts/run_integration_test.sh | 9 +
scripts/travis/build.sh | 6 +-
scripts/travis/test.sh | 19 +-
third_party/cereal/{cereal.BUILD => BUILD} | 53 +-
third_party/cereal/cereal-1.2.1.tar.gz | Bin 0 -> 301689 bytes
third_party/cereal/cereal.BUILD | 101 -
third_party/cereal/empty.cc | 0
third_party/glog/empty.cc | 2 -
third_party/glog/glog-0.3.5.tar.gz | Bin 532275 -> 0 bytes
third_party/glog/{BUILD => glog.BUILD} | 45 +-
third_party/gperftools/empty.cc | 2 -
third_party/gperftools/gperftools-2.4.tar.gz | Bin 1346075 -> 0 bytes
third_party/gperftools/{BUILD => gperftools.BUILD} | 62 +-
third_party/libevent/empty.cc | 2 -
third_party/libevent/libevent-2.1.8-stable.tar.gz | Bin 1026485 -> 0 bytes
third_party/libevent/{BUILD => libevent.BUILD} | 24 +-
third_party/libunwind/BUILD | 76 +-
third_party/libunwind/empty.cc | 2 -
third_party/libunwind/libunwind-1.1.tar.gz | Bin 1098603 -> 0 bytes
third_party/libunwind/{BUILD => libunwind.BUILD} | 44 +-
third_party/zookeeper/empty.cc | 2 -
third_party/zookeeper/zookeeper-3.4.10.tar.gz | Bin 35042811 -> 0 bytes
third_party/zookeeper/{BUILD => zookeeper.BUILD} | 20 +-
tools/rules/heron_examples.bzl | 6 +-
website/.gitignore | 2 +-
website/Makefile | 14 +-
website/config.yaml | 3 +-
website/content/docs/concepts/architecture.md | 68 +-
.../docs/developers/java/streamlet-api.mmark | 32 +-
.../operators/deployment/{index.md => _index.md} | 0
.../deployment/uploaders/{index.md => _index.md} | 0
.../observability/{index.md => _index.md} | 0
website/gulpfile.js | 32 +-
website/layouts/docs/{single.ace => list.ace} | 7 +-
website/layouts/docs/single.ace | 7 +-
website/layouts/index.ace | 1 -
website/layouts/partials/css.includes.ace | 7 +-
website/package.json | 3 +
website/public | 2 +-
website/yarn.lock | 3622 ++++++++++++++++++++
254 files changed, 8878 insertions(+), 2492 deletions(-)
diff --cc heron/executor/src/python/heron_executor.py
index f79dbfa,3f36e5d..b97dd67
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@@ -498,11 -498,7 +498,10 @@@ class HeronExecutor(object)
"--cluster", self.cluster,
"--role", self.role,
"--environment", self.environment,
- "--topology_name", self.topology_name]
+ "--topology_name", self.topology_name,
+ "--metricsmgr_port", self.metrics_manager_port,
+ "--system_config_file", self.heron_internals_config_file,
- "--override_config_file", self.override_config_file,
- "--verbose"]
++ "--override_config_file", self.override_config_file]
return healthmgr_cmd
diff --cc heron/executor/tests/python/heron_executor_unittest.py
index 5167da4,5b661e7..dab237e
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@@ -131,9 -131,7 +131,9 @@@ class HeronExecutorTest(unittest.TestCa
"-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \
"-cp scheduler_classpath:healthmgr_classpath " \
"com.twitter.heron.healthmgr.HealthManager --cluster cluster --role role " \
- "--environment environ --topology_name topname"
+ "--environment environ --topology_name topname --metricsmgr_port metricsmgr_port " \
- "--system_config_file %s --override_config_file %s --verbose" %\
++ "--system_config_file %s --override_config_file %s" %\
+ (INTERNAL_CONF_PATH, OVERRIDE_PATH)
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
diff --cc heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
index 9fd6d72,673c3b6..b1fa4a2
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
@@@ -14,46 -14,32 +14,39 @@@
package com.twitter.heron.healthmgr.diagnosers;
- import java.util.List;
- import java.util.Map;
+ import java.time.Instant;
+ import java.util.ArrayList;
+ import java.util.Collection;
import java.util.logging.Logger;
- import com.microsoft.dhalion.detector.Symptom;
- import com.microsoft.dhalion.diagnoser.Diagnosis;
- import com.microsoft.dhalion.metrics.ComponentMetrics;
- import com.microsoft.dhalion.metrics.InstanceMetrics;
+ import com.microsoft.dhalion.core.Diagnosis;
+ import com.microsoft.dhalion.core.MeasurementsTable;
+ import com.microsoft.dhalion.core.Symptom;
+ import com.microsoft.dhalion.core.SymptomsTable;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.healthmgr.HealthManagerMetrics;
- import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
- import com.twitter.heron.healthmgr.common.MetricsStats;
+
- import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
- import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
- import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
- import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+ import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+ import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+ import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+ import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+ import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
public class SlowInstanceDiagnoser extends BaseDiagnoser {
+ public static final String SLOW_INSTANCE_DIAGNOSER = "SlowInstanceDiagnoser";
private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
@Override
- public Diagnosis diagnose(List<Symptom> symptoms) {
+ public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+ ((HealthManagerMetrics) SingletonRegistry.INSTANCE
+ .getSingleton(HealthManagerMetrics.METRICS_THREAD))
- .executeDiagnoserIncr(SLOW_INSTANCE_DIAGNOSER);
++ .executeDiagnoserIncr(SLOW_INSTANCE_DIAGNOSER);
+ Collection<Diagnosis> diagnoses = new ArrayList<>();
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
- List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
- Map<String, ComponentMetrics> processingRateSkewComponents =
- getProcessingRateSkewComponents(symptoms);
- Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
-
- if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
- || !processingRateSkewComponents.isEmpty()) {
- // Since there is no back pressure or disparate wait count or similar
- // execution count, no action is needed
- return null;
- } else if (bpSymptoms.size() > 1) {
+ SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ if (bp.size() > 1) {
// TODO handle cases where multiple detectors create back pressure symptom
throw new IllegalStateException("Multiple back-pressure symptoms case");
}
diff --cc heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
index 18fa71c,7bc7833..f780912
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
@@@ -21,30 -24,22 +24,25 @@@ import javax.inject.Inject
import javax.inject.Named;
import com.microsoft.dhalion.api.IResolver;
- import com.microsoft.dhalion.detector.Symptom;
- import com.microsoft.dhalion.diagnoser.Diagnosis;
+ import com.microsoft.dhalion.core.Action;
+ import com.microsoft.dhalion.core.Diagnosis;
+ import com.microsoft.dhalion.core.SymptomsTable;
import com.microsoft.dhalion.events.EventManager;
- import com.microsoft.dhalion.metrics.InstanceMetrics;
- import com.microsoft.dhalion.resolver.Action;
+ import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.healthmgr.HealthManagerMetrics;
- import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
- import com.twitter.heron.healthmgr.common.PhysicalPlanProvider;
import com.twitter.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
import com.twitter.heron.scheduler.client.ISchedulerClient;
import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
- import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
- import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
- import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+ import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
public class RestartContainerResolver implements IResolver {
+ public static final String RESTART_CONTAINER_RESOLVER = "RestartContainerResolver";
private static final Logger LOG = Logger.getLogger(RestartContainerResolver.class.getName());
- private final PhysicalPlanProvider physicalPlanProvider;
private final EventManager eventManager;
private final String topologyName;
private final ISchedulerClient schedulerClient;
@@@ -61,57 -55,54 +58,59 @@@
}
@Override
- public List<Action> resolve(List<Diagnosis> diagnosis) {
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
+
+ @Override
+ public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
+ HealthManagerMetrics hmm = (HealthManagerMetrics) SingletonRegistry.INSTANCE
+ .getSingleton(HealthManagerMetrics.METRICS_THREAD);
+ hmm.executeResolver(RESTART_CONTAINER_RESOLVER);
+
List<Action> actions = new ArrayList<>();
- for (Diagnosis diagnoses : diagnosis) {
- Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_SLOW_INSTANCE.text());
- if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
- // nothing to fix as there is no back pressure
- continue;
- }
-
- if (bpSymptom.getComponents().size() > 1) {
- throw new UnsupportedOperationException("Multiple components with back pressure symptom");
- }
-
- // want to know which stmgr has backpressure
- String stmgrId = null;
- for (InstanceMetrics im : bpSymptom.getComponent().getMetrics().values()) {
- if (im.hasMetricAboveLimit(METRIC_BACK_PRESSURE.text(), noiseFilterMillis)) {
- String instanceId = im.getName();
- int fromIndex = instanceId.indexOf('_') + 1;
- int toIndex = instanceId.indexOf('_', fromIndex);
- stmgrId = instanceId.substring(fromIndex, toIndex);
- break;
- }
- }
+ // find all back pressure measurements reported in this execution cycle
+ Instant current = context.checkpoint();
+ Instant previous = context.previousCheckpoint();
+ SymptomsTable bpSymptoms = context.symptoms()
+ .type(SYMPTOM_INSTANCE_BACK_PRESSURE.text())
+ .between(previous, current);
+
+ if (bpSymptoms.size() == 0) {
+ LOG.fine("No back-pressure measurements found, ending as there's nothing to fix");
+ return actions;
+ }
+
+ Collection<String> allBpInstances = new HashSet<>();
+ bpSymptoms.get().forEach(symptom -> allBpInstances.addAll(symptom.assignments()));
+
+ LOG.info(String.format("%d instances caused back-pressure", allBpInstances.size()));
+
+ Collection<String> stmgrIds = new HashSet<>();
+ allBpInstances.forEach(instanceId -> {
+ LOG.info("Id of instance causing back-pressure: " + instanceId);
+ int fromIndex = instanceId.indexOf('_') + 1;
+ int toIndex = instanceId.indexOf('_', fromIndex);
+ String stmgrId = instanceId.substring(fromIndex, toIndex);
+ stmgrIds.add(stmgrId);
+ });
+
+ stmgrIds.forEach(stmgrId -> {
LOG.info("Restarting container: " + stmgrId);
boolean b = schedulerClient.restartTopology(
RestartTopologyRequest.newBuilder()
- .setContainerIndex(Integer.valueOf(stmgrId))
- .setTopologyName(topologyName)
- .build());
+ .setContainerIndex(Integer.valueOf(stmgrId))
+ .setTopologyName(topologyName)
+ .build());
LOG.info("Restarted container result: " + b);
+ hmm.executeIncr("RestartContainer");
+ });
- ContainerRestart action = new ContainerRestart();
- LOG.info("Broadcasting container restart event");
- eventManager.onEvent(action);
-
- actions.add(action);
- return actions;
- }
-
+ LOG.info("Broadcasting container restart event");
+ ContainerRestart action = new ContainerRestart(current, stmgrIds);
+ eventManager.onEvent(action);
+ actions.add(action);
return actions;
}
-
- @Override
- public void close() {
- }
}
diff --cc heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
index faa0441,76829cf..6d7c740
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
@@@ -24,11 -23,9 +23,11 @@@ import java.util.Collection
import javax.inject.Inject;
import com.microsoft.dhalion.api.MetricsProvider;
- import com.microsoft.dhalion.metrics.ComponentMetrics;
- import com.microsoft.dhalion.metrics.InstanceMetrics;
+ import com.microsoft.dhalion.core.Measurement;
+ import com.microsoft.dhalion.core.MeasurementsTable;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.healthmgr.HealthManagerMetrics;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.common.TopologyProvider;
@@@ -36,9 -33,6 +35,8 @@@
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
public class BackPressureSensor extends BaseSensor {
+ public static final String BACKPRESSURE_SENSOR = "BackPressureSensor";
- private static final Logger LOG = Logger.getLogger(BackPressureSensor.class.getName());
+
private final MetricsProvider metricsProvider;
private final PackingPlanProvider packingPlanProvider;
private final TopologyProvider topologyProvider;
@@@ -57,32 -51,24 +55,28 @@@
/**
* Computes the average (millis/sec) back-pressure caused by instances in the configured window
*
- * @return the average value
+ * @return the average value measurements
*/
- public Map<String, ComponentMetrics> get() {
+ @Override
+ public Collection<Measurement> fetch() {
+ ((HealthManagerMetrics) SingletonRegistry.INSTANCE
+ .getSingleton(HealthManagerMetrics.METRICS_THREAD))
- .executeSensorIncr(BACKPRESSURE_SENSOR);
++ .executeSensorIncr(BACKPRESSURE_SENSOR);
+
- Map<String, ComponentMetrics> result = new HashMap<>();
+ Collection<Measurement> result = new ArrayList<>();
+ Instant now = context.checkpoint();
String[] boltComponents = topologyProvider.getBoltNames();
- for (String boltComponent : boltComponents) {
- String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+ Duration duration = getDuration();
+ for (String component : boltComponents) {
+ String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
- Duration duration = getDuration();
- Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
- for (String boltInstanceName : boltInstanceNames) {
- String metric = getMetricName() + boltInstanceName;
- Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
- metric, duration, COMPONENT_STMGR);
+ for (String instance : boltInstanceNames) {
+ String metric = getMetricName() + instance;
- if (stmgrResult.get(COMPONENT_STMGR) == null) {
+ Collection<Measurement> stmgrResult
+ = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+ if (stmgrResult.isEmpty()) {
continue;
}
--
To stop receiving notification emails like this one, please contact
huijun@apache.org.