You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2019/06/14 23:01:26 UTC
[hadoop] branch trunk updated: YARN-8499 ATSv2 Generalize
TimelineStorageMonitor. Contributed by Prabhu Joseph
This is an automated email from the ASF dual-hosted git repository.
eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new cda9f33 YARN-8499 ATSv2 Generalize TimelineStorageMonitor. Contributed by Prabhu Joseph
cda9f33 is described below
commit cda9f3374573f0cb5ae4f26ba3fbc77aae45ec58
Author: Eric Yang <ey...@apache.org>
AuthorDate: Fri Jun 14 18:59:14 2019 -0400
YARN-8499 ATSv2 Generalize TimelineStorageMonitor.
Contributed by Prabhu Joseph
---
.../storage/TestTimelineReaderHBaseDown.java | 4 +-
.../storage/HBaseStorageMonitor.java | 90 +++++++++++++++++
.../storage/HBaseTimelineReaderImpl.java | 90 ++---------------
.../storage/TimelineStorageMonitor.java | 106 +++++++++++++++++++++
4 files changed, 206 insertions(+), 84 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
index 786f529..e738d39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
@@ -34,8 +34,8 @@ import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
-import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE;
-import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.DATA_TO_RETRIEVE;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.MONITOR_FILTERS;
public class TestTimelineReaderHBaseDown {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java
new file mode 100644
index 0000000..c433aa6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+
+/**
+ * HBase based implementation for {@link TimelineStorageMonitor}.
+ */
+public class HBaseStorageMonitor extends TimelineStorageMonitor {
+
+ protected static final TimelineEntityFilters MONITOR_FILTERS =
+ new TimelineEntityFilters.Builder().entityLimit(1L).build();
+ protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
+ new TimelineDataToRetrieve(null, null, null, null, null, null);
+
+ private Configuration monitorHBaseConf;
+ private Connection monitorConn;
+ private TimelineEntityReader reader;
+
+ public HBaseStorageMonitor(Configuration conf) throws Exception {
+ super(conf, Storage.HBase);
+ this.initialize(conf);
+ }
+
+ private void initialize(Configuration conf) throws Exception {
+ monitorHBaseConf = HBaseTimelineStorageUtils.
+ getTimelineServiceHBaseConf(conf);
+ monitorHBaseConf.setInt("hbase.client.retries.number", 3);
+ monitorHBaseConf.setLong("hbase.client.pause", 1000);
+ long monitorInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
+ );
+ monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
+ monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
+ monitorInterval);
+ monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
+ monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
+
+ String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+ TimelineReaderContext monitorContext =
+ new TimelineReaderContext(clusterId, null, null, null, null,
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
+ reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(
+ monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
+ }
+
+ @Override
+ public void healthCheck() throws Exception {
+ reader.readEntities(monitorHBaseConf, monitorConn);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ monitorConn.close();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 653126e..4c71fd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -31,8 +27,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -54,12 +48,7 @@ public class HBaseTimelineReaderImpl
private Configuration hbaseConf = null;
private Connection conn;
- private Configuration monitorHBaseConf = null;
- private Connection monitorConn;
- private ScheduledExecutorService monitorExecutorService;
- private TimelineReaderContext monitorContext;
- private long monitorInterval;
- private AtomicBoolean hbaseDown = new AtomicBoolean();
+ private TimelineStorageMonitor storageMonitor;
public HBaseTimelineReaderImpl() {
super(HBaseTimelineReaderImpl.class.getName());
@@ -68,39 +57,15 @@ public class HBaseTimelineReaderImpl
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
-
- String clusterId = conf.get(
- YarnConfiguration.RM_CLUSTER_ID,
- YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
- monitorContext =
- new TimelineReaderContext(clusterId, null, null, null, null,
- TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
- monitorInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
-
- monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
- monitorHBaseConf.setInt("hbase.client.retries.number", 3);
- monitorHBaseConf.setLong("hbase.client.pause", 1000);
- monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
- monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
- monitorInterval);
- monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
- monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
-
- monitorExecutorService = Executors.newScheduledThreadPool(1);
-
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
+ storageMonitor = new HBaseStorageMonitor(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
- LOG.info("Scheduling HBase liveness monitor at interval {}",
- monitorInterval);
- monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
- monitorInterval, TimeUnit.MILLISECONDS);
+ storageMonitor.start();
}
@Override
@@ -109,31 +74,18 @@ public class HBaseTimelineReaderImpl
LOG.info("closing the hbase Connection");
conn.close();
}
- if (monitorExecutorService != null) {
- monitorExecutorService.shutdownNow();
- if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
- LOG.warn("failed to stop the monitir task in time. " +
- "will still proceed to close the monitor.");
- }
- }
- monitorConn.close();
+ storageMonitor.stop();
super.serviceStop();
}
- private void checkHBaseDown() throws IOException {
- if (hbaseDown.get()) {
- throw new IOException("HBase is down");
- }
- }
-
public boolean isHBaseDown() {
- return hbaseDown.get();
+ return storageMonitor.isStorageDown();
}
@Override
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
- checkHBaseDown();
+ storageMonitor.checkStorageIsUp();
TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(context,
dataToRetrieve);
@@ -144,7 +96,7 @@ public class HBaseTimelineReaderImpl
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
- checkHBaseDown();
+ storageMonitor.checkStorageIsUp();
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
filters, dataToRetrieve);
@@ -154,7 +106,7 @@ public class HBaseTimelineReaderImpl
@Override
public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException {
- checkHBaseDown();
+ storageMonitor.checkStorageIsUp();
EntityTypeReader reader = new EntityTypeReader(context);
return reader.readEntityTypes(hbaseConf, conn);
}
@@ -171,30 +123,4 @@ public class HBaseTimelineReaderImpl
}
}
- protected static final TimelineEntityFilters MONITOR_FILTERS =
- new TimelineEntityFilters.Builder().entityLimit(1L).build();
- protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
- new TimelineDataToRetrieve(null, null, null, null, null, null);
-
- private class HBaseMonitor implements Runnable {
- @Override
- public void run() {
- try {
- LOG.debug("Running HBase liveness monitor");
- TimelineEntityReader reader =
- TimelineEntityReaderFactory.createMultipleEntitiesReader(
- monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
- reader.readEntities(monitorHBaseConf, monitorConn);
-
- // on success, reset hbase down flag
- if (hbaseDown.getAndSet(false)) {
- LOG.debug("HBase request succeeded, assuming HBase up");
- }
- } catch (Exception e) {
- LOG.warn("Got failure attempting to read from timeline storage, " +
- "assuming HBase down", e);
- hbaseDown.getAndSet(true);
- }
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java
new file mode 100644
index 0000000..fc96f19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * This abstract class is for monitoring Health of Timeline Storage.
+ */
+public abstract class TimelineStorageMonitor {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TimelineStorageMonitor.class);
+
+ /** Different Storages supported by ATSV2. */
+ public enum Storage {
+ HBase
+ }
+
+ private ScheduledExecutorService monitorExecutorService;
+ private long monitorInterval;
+ private Storage storage;
+ private AtomicBoolean storageDown = new AtomicBoolean();
+
+ public TimelineStorageMonitor(Configuration conf, Storage storage) {
+ this.storage = storage;
+ this.monitorInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
+ );
+ }
+
+ public void start() {
+ LOG.info("Scheduling {} storage monitor at interval {}",
+ this.storage, monitorInterval);
+ monitorExecutorService = Executors.newScheduledThreadPool(1);
+ monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0,
+ monitorInterval, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() throws Exception {
+ if (monitorExecutorService != null) {
+ monitorExecutorService.shutdownNow();
+ if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.warn("Failed to stop the monitor task in time. " +
+ "will still proceed to close the monitor.");
+ }
+ }
+ }
+
+ abstract public void healthCheck() throws Exception;
+
+ public void checkStorageIsUp() throws IOException {
+ if (storageDown.get()) {
+ throw new IOException(storage + " is down");
+ }
+ }
+
+ public boolean isStorageDown() {
+ return storageDown.get();
+ }
+
+ private class MonitorThread implements Runnable {
+ @Override
+ public void run() {
+ try {
+ LOG.debug("Running Timeline Storage monitor");
+ healthCheck();
+ if (storageDown.getAndSet(false)) {
+ LOG.debug("{} health check succeeded, " +
+ "assuming storage is up", storage);
+ }
+ } catch (Exception e) {
+ LOG.warn(String.format("Got failure attempting to read from %s, " +
+ "assuming Storage is down", storage), e);
+ storageDown.getAndSet(true);
+ }
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org