You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2019/06/14 23:01:26 UTC

[hadoop] branch trunk updated: YARN-8499 ATSv2 Generalize TimelineStorageMonitor. Contributed by Prabhu Joseph

This is an automated email from the ASF dual-hosted git repository.

eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cda9f33  YARN-8499 ATSv2 Generalize TimelineStorageMonitor.            Contributed by Prabhu Joseph
cda9f33 is described below

commit cda9f3374573f0cb5ae4f26ba3fbc77aae45ec58
Author: Eric Yang <ey...@apache.org>
AuthorDate: Fri Jun 14 18:59:14 2019 -0400

    YARN-8499 ATSv2 Generalize TimelineStorageMonitor.
               Contributed by Prabhu Joseph
---
 .../storage/TestTimelineReaderHBaseDown.java       |   4 +-
 .../storage/HBaseStorageMonitor.java               |  90 +++++++++++++++++
 .../storage/HBaseTimelineReaderImpl.java           |  90 ++---------------
 .../storage/TimelineStorageMonitor.java            | 106 +++++++++++++++++++++
 4 files changed, 206 insertions(+), 84 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
index 786f529..e738d39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
@@ -34,8 +34,8 @@ import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
-import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE;
-import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.DATA_TO_RETRIEVE;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.MONITOR_FILTERS;
 
 public class TestTimelineReaderHBaseDown {
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java
new file mode 100644
index 0000000..c433aa6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+
+/**
+ * HBase based implementation for {@link TimelineStorageMonitor}.
+ */
+public class HBaseStorageMonitor extends TimelineStorageMonitor {
+
+  protected static final TimelineEntityFilters MONITOR_FILTERS =
+      new TimelineEntityFilters.Builder().entityLimit(1L).build();
+  protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
+      new TimelineDataToRetrieve(null, null, null, null, null, null);
+
+  private Configuration monitorHBaseConf;
+  private Connection monitorConn;
+  private TimelineEntityReader reader;
+
+  public HBaseStorageMonitor(Configuration conf) throws Exception {
+    super(conf, Storage.HBase);
+    this.initialize(conf);
+  }
+
+  private void initialize(Configuration conf) throws  Exception {
+    monitorHBaseConf = HBaseTimelineStorageUtils.
+        getTimelineServiceHBaseConf(conf);
+    monitorHBaseConf.setInt("hbase.client.retries.number", 3);
+    monitorHBaseConf.setLong("hbase.client.pause", 1000);
+    long monitorInterval = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
+    );
+    monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
+    monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
+        monitorInterval);
+    monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
+    monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
+
+    String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+    TimelineReaderContext monitorContext =
+        new TimelineReaderContext(clusterId, null, null, null, null,
+        TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
+    reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(
+        monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
+  }
+
+  @Override
+  public void healthCheck() throws Exception {
+    reader.readEntities(monitorHBaseConf, monitorConn);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() throws Exception {
+    super.stop();
+    monitorConn.close();
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 653126e..4c71fd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -31,8 +27,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -54,12 +48,7 @@ public class HBaseTimelineReaderImpl
 
   private Configuration hbaseConf = null;
   private Connection conn;
-  private Configuration monitorHBaseConf = null;
-  private Connection monitorConn;
-  private ScheduledExecutorService monitorExecutorService;
-  private TimelineReaderContext monitorContext;
-  private long monitorInterval;
-  private AtomicBoolean hbaseDown = new AtomicBoolean();
+  private TimelineStorageMonitor storageMonitor;
 
   public HBaseTimelineReaderImpl() {
     super(HBaseTimelineReaderImpl.class.getName());
@@ -68,39 +57,15 @@ public class HBaseTimelineReaderImpl
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-
-    String clusterId = conf.get(
-        YarnConfiguration.RM_CLUSTER_ID,
-        YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
-    monitorContext =
-        new TimelineReaderContext(clusterId, null, null, null, null,
-            TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
-    monitorInterval = conf.getLong(
-        YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
-
-    monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
-    monitorHBaseConf.setInt("hbase.client.retries.number", 3);
-    monitorHBaseConf.setLong("hbase.client.pause", 1000);
-    monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
-    monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
-        monitorInterval);
-    monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
-    monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
-
-    monitorExecutorService = Executors.newScheduledThreadPool(1);
-
     hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
+    storageMonitor = new HBaseStorageMonitor(conf);
   }
 
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
-    LOG.info("Scheduling HBase liveness monitor at interval {}",
-        monitorInterval);
-    monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
-        monitorInterval, TimeUnit.MILLISECONDS);
+    storageMonitor.start();
   }
 
   @Override
@@ -109,31 +74,18 @@ public class HBaseTimelineReaderImpl
       LOG.info("closing the hbase Connection");
       conn.close();
     }
-    if (monitorExecutorService != null) {
-      monitorExecutorService.shutdownNow();
-      if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
-        LOG.warn("failed to stop the monitir task in time. " +
-            "will still proceed to close the monitor.");
-      }
-    }
-    monitorConn.close();
+    storageMonitor.stop();
     super.serviceStop();
   }
 
-  private void checkHBaseDown() throws IOException {
-    if (hbaseDown.get()) {
-      throw new IOException("HBase is down");
-    }
-  }
-
   public boolean isHBaseDown() {
-    return hbaseDown.get();
+    return storageMonitor.isStorageDown();
   }
 
   @Override
   public TimelineEntity getEntity(TimelineReaderContext context,
       TimelineDataToRetrieve dataToRetrieve) throws IOException {
-    checkHBaseDown();
+    storageMonitor.checkStorageIsUp();
     TimelineEntityReader reader =
         TimelineEntityReaderFactory.createSingleEntityReader(context,
             dataToRetrieve);
@@ -144,7 +96,7 @@ public class HBaseTimelineReaderImpl
   public Set<TimelineEntity> getEntities(TimelineReaderContext context,
       TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
       throws IOException {
-    checkHBaseDown();
+    storageMonitor.checkStorageIsUp();
     TimelineEntityReader reader =
         TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
             filters, dataToRetrieve);
@@ -154,7 +106,7 @@ public class HBaseTimelineReaderImpl
   @Override
   public Set<String> getEntityTypes(TimelineReaderContext context)
       throws IOException {
-    checkHBaseDown();
+    storageMonitor.checkStorageIsUp();
     EntityTypeReader reader = new EntityTypeReader(context);
     return reader.readEntityTypes(hbaseConf, conn);
   }
@@ -171,30 +123,4 @@ public class HBaseTimelineReaderImpl
     }
   }
 
-  protected static final TimelineEntityFilters MONITOR_FILTERS =
-      new TimelineEntityFilters.Builder().entityLimit(1L).build();
-  protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
-      new TimelineDataToRetrieve(null, null, null, null, null, null);
-
-  private class HBaseMonitor implements Runnable {
-    @Override
-    public void run() {
-      try {
-        LOG.debug("Running HBase liveness monitor");
-        TimelineEntityReader reader =
-            TimelineEntityReaderFactory.createMultipleEntitiesReader(
-                monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
-        reader.readEntities(monitorHBaseConf, monitorConn);
-
-        // on success, reset hbase down flag
-        if (hbaseDown.getAndSet(false)) {
-          LOG.debug("HBase request succeeded, assuming HBase up");
-        }
-      } catch (Exception e) {
-        LOG.warn("Got failure attempting to read from timeline storage, " +
-            "assuming HBase down", e);
-        hbaseDown.getAndSet(true);
-      }
-    }
-  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java
new file mode 100644
index 0000000..fc96f19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * This abstract class is for monitoring Health of Timeline Storage.
+ */
+public abstract class TimelineStorageMonitor  {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TimelineStorageMonitor.class);
+
+  /** Different Storages supported by ATSV2. */
+  public enum Storage {
+    HBase
+  }
+
+  private ScheduledExecutorService monitorExecutorService;
+  private long monitorInterval;
+  private Storage storage;
+  private AtomicBoolean storageDown = new AtomicBoolean();
+
+  public TimelineStorageMonitor(Configuration conf, Storage storage) {
+    this.storage = storage;
+    this.monitorInterval = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
+        );
+  }
+
+  public void start() {
+    LOG.info("Scheduling {} storage monitor at interval {}",
+        this.storage, monitorInterval);
+    monitorExecutorService = Executors.newScheduledThreadPool(1);
+    monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0,
+        monitorInterval, TimeUnit.MILLISECONDS);
+  }
+
+  public void stop() throws Exception {
+    if (monitorExecutorService != null) {
+      monitorExecutorService.shutdownNow();
+      if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOG.warn("Failed to stop the monitor task in time. " +
+            "will still proceed to close the monitor.");
+      }
+    }
+  }
+
+  abstract public void healthCheck() throws Exception;
+
+  public void checkStorageIsUp() throws IOException {
+    if (storageDown.get()) {
+      throw new IOException(storage + " is down");
+    }
+  }
+
+  public boolean isStorageDown() {
+    return storageDown.get();
+  }
+
+  private class MonitorThread implements Runnable {
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Running Timeline Storage monitor");
+        healthCheck();
+        if (storageDown.getAndSet(false)) {
+          LOG.debug("{} health check succeeded, " +
+              "assuming storage is up", storage);
+        }
+      } catch (Exception e) {
+        LOG.warn(String.format("Got failure attempting to read from %s, " +
+            "assuming Storage is down", storage), e);
+        storageDown.getAndSet(true);
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org