You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:20:04 UTC

[28/50] [abbrv] hadoop git commit: YARN-8302. ATS v2 should handle HBase connection issue properly. Contributed by Billie Rinaldi.

YARN-8302. ATS v2 should handle HBase connection issue properly. Contributed by Billie Rinaldi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba683204
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba683204
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba683204

Branch: refs/heads/HDDS-48
Commit: ba683204498c97654be4727ab9e128c433a45498
Parents: 0247cb6
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Fri Jul 6 15:19:01 2018 -0700
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Jul 6 15:19:01 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   7 +
 .../storage/TestTimelineReaderHBaseDown.java    | 220 +++++++++++++++++++
 .../storage/HBaseTimelineReaderImpl.java        |  93 ++++++++
 3 files changed, 320 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5842d64..9156c2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3659,6 +3659,13 @@ public class YarnConfiguration extends Configuration {
       DEFAULT_TIMELINE_SERVICE_READER_WEBAPP_HTTPS_ADDRESS =
       DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS;
 
+  @Private
+  public static final String
+      TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS =
+      TIMELINE_SERVICE_READER_PREFIX + "storage-monitor.interval-ms";
+  public static final long
+      DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS = 60 * 1000;
+
   /**
    * Marked collector properties as Private since it run as auxillary service.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
new file mode 100644
index 0000000..786f529
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS;
+
+public class TestTimelineReaderHBaseDown {
+
+  @Test(timeout=300000)
+  public void testTimelineReaderHBaseUp() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    configure(util);
+    try {
+      util.startMiniCluster();
+      DataGeneratorForTest.createSchema(util.getConfiguration());
+      DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+      TimelineReaderServer server = getTimelineReaderServer();
+      server.init(util.getConfiguration());
+      HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+      server.start();
+      checkQuery(htr);
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  @Test(timeout=300000)
+  public void testTimelineReaderInitWhenHBaseIsDown() throws
+      TimeoutException, InterruptedException {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    configure(util);
+    TimelineReaderServer server = getTimelineReaderServer();
+
+    // init timeline reader when hbase is not running
+    server.init(util.getConfiguration());
+    HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+    server.start();
+    waitForHBaseDown(htr);
+  }
+
+  @Test(timeout=300000)
+  public void testTimelineReaderDetectsHBaseDown() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    configure(util);
+
+    try {
+      // start minicluster
+      util.startMiniCluster();
+      DataGeneratorForTest.createSchema(util.getConfiguration());
+      DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+      // init timeline reader
+      TimelineReaderServer server = getTimelineReaderServer();
+      server.init(util.getConfiguration());
+      HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+      // stop hbase after timeline reader init
+      util.shutdownMiniHBaseCluster();
+
+      // start server and check that it detects hbase is down
+      server.start();
+      waitForHBaseDown(htr);
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  @Test(timeout=300000)
+  public void testTimelineReaderDetectsZooKeeperDown() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    configure(util);
+
+    try {
+      // start minicluster
+      util.startMiniCluster();
+      DataGeneratorForTest.createSchema(util.getConfiguration());
+      DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+      // init timeline reader
+      TimelineReaderServer server = getTimelineReaderServer();
+      server.init(util.getConfiguration());
+      HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+      // stop hbase and zookeeper after timeline reader init
+      util.shutdownMiniCluster();
+
+      // start server and check that it detects hbase is down
+      server.start();
+      waitForHBaseDown(htr);
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  @Test(timeout=300000)
+  public void testTimelineReaderRecoversAfterHBaseReturns() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    configure(util);
+
+    try {
+      // start minicluster
+      util.startMiniCluster();
+      DataGeneratorForTest.createSchema(util.getConfiguration());
+      DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+      // init timeline reader
+      TimelineReaderServer server = getTimelineReaderServer();
+      server.init(util.getConfiguration());
+      HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+      // stop hbase after timeline reader init
+      util.shutdownMiniHBaseCluster();
+
+      // start server and check that it detects hbase is down
+      server.start();
+      waitForHBaseDown(htr);
+
+      util.startMiniHBaseCluster(1, 1);
+      GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000);
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws
+      TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000);
+    try {
+      checkQuery(htr);
+      Assert.fail("Query should fail when HBase is down");
+    } catch (IOException e) {
+      Assert.assertEquals("HBase is down", e.getMessage());
+    }
+  }
+
+  private static void checkQuery(HBaseTimelineReaderImpl htr) throws
+      IOException {
+    TimelineReaderContext context =
+        new TimelineReaderContext(YarnConfiguration.DEFAULT_RM_CLUSTER_ID,
+            null, null, null, null, TimelineEntityType
+            .YARN_FLOW_ACTIVITY.toString(), null, null);
+    Set<TimelineEntity> entities = htr.getEntities(context, MONITOR_FILTERS,
+        DATA_TO_RETRIEVE);
+  }
+
+  private static void configure(HBaseTestingUtility util) {
+    Configuration config = util.getConfiguration();
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    config.set(YarnConfiguration.TIMELINE_SERVICE_READER_WEBAPP_ADDRESS,
+        "localhost:0");
+    config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+    config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+        "org.apache.hadoop.yarn.server.timelineservice.storage."
+            + "HBaseTimelineReaderImpl");
+    config.setInt("hfile.format.version", 3);
+    config.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000);
+  }
+
+  private static TimelineReaderServer getTimelineReaderServer() {
+    return new TimelineReaderServer() {
+      @Override
+      protected void addFilters(Configuration conf) {
+        // The parent code uses hadoop-common jar from this version of
+        // Hadoop, but the tests are using hadoop-common jar from
+        // ${hbase-compatible-hadoop.version}.  This version uses Jetty 9
+        // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there
+        // are many differences, including classnames and packages.
+        // We do nothing here, so that we don't cause a NoSuchMethodError or
+        // NoClassDefFoundError.
+        // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3,
+        // we should be able to remove this @Override.
+      }
+    };
+  }
+
+  private static HBaseTimelineReaderImpl getHBaseTimelineReaderImpl(
+      TimelineReaderServer server) {
+    for (Service s: server.getServices()) {
+      if (s instanceof HBaseTimelineReaderImpl) {
+        return (HBaseTimelineReaderImpl) s;
+      }
+    }
+    throw new IllegalStateException("Couldn't find HBaseTimelineReaderImpl");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 1ebfab2..fadfd14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -47,6 +53,12 @@ public class HBaseTimelineReaderImpl
 
   private Configuration hbaseConf = null;
   private Connection conn;
+  private Configuration monitorHBaseConf = null;
+  private Connection monitorConn;
+  private ScheduledExecutorService monitorExecutorService;
+  private TimelineReaderContext monitorContext;
+  private long monitorInterval;
+  private AtomicBoolean hbaseDown = new AtomicBoolean();
 
   public HBaseTimelineReaderImpl() {
     super(HBaseTimelineReaderImpl.class.getName());
@@ -55,22 +67,72 @@ public class HBaseTimelineReaderImpl
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
+
+    String clusterId = conf.get(
+        YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+    monitorContext =
+        new TimelineReaderContext(clusterId, null, null, null, null,
+            TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
+    monitorInterval = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
+
+    monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+    monitorHBaseConf.setInt("hbase.client.retries.number", 3);
+    monitorHBaseConf.setLong("hbase.client.pause", 1000);
+    monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
+    monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
+        monitorInterval);
+    monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
+    monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
+
+    monitorExecutorService = Executors.newScheduledThreadPool(1);
+
     hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
   }
 
   @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    LOG.info("Scheduling HBase liveness monitor at interval {}",
+        monitorInterval);
+    monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
+        monitorInterval, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
   protected void serviceStop() throws Exception {
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();
     }
+    if (monitorExecutorService != null) {
+      monitorExecutorService.shutdownNow();
+      if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOG.warn("failed to stop the monitir task in time. " +
+            "will still proceed to close the monitor.");
+      }
+    }
+    monitorConn.close();
     super.serviceStop();
   }
 
+  private void checkHBaseDown() throws IOException {
+    if (hbaseDown.get()) {
+      throw new IOException("HBase is down");
+    }
+  }
+
+  public boolean isHBaseDown() {
+    return hbaseDown.get();
+  }
+
   @Override
   public TimelineEntity getEntity(TimelineReaderContext context,
       TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    checkHBaseDown();
     TimelineEntityReader reader =
         TimelineEntityReaderFactory.createSingleEntityReader(context,
             dataToRetrieve);
@@ -81,6 +143,7 @@ public class HBaseTimelineReaderImpl
   public Set<TimelineEntity> getEntities(TimelineReaderContext context,
       TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
       throws IOException {
+    checkHBaseDown();
     TimelineEntityReader reader =
         TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
             filters, dataToRetrieve);
@@ -90,7 +153,37 @@ public class HBaseTimelineReaderImpl
   @Override
   public Set<String> getEntityTypes(TimelineReaderContext context)
       throws IOException {
+    checkHBaseDown();
     EntityTypeReader reader = new EntityTypeReader(context);
     return reader.readEntityTypes(hbaseConf, conn);
   }
+
+  protected static final TimelineEntityFilters MONITOR_FILTERS =
+      new TimelineEntityFilters.Builder().entityLimit(1L).build();
+  protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
+      new TimelineDataToRetrieve(null, null, null, null, null, null);
+
+  private class HBaseMonitor implements Runnable {
+    @Override
+    public void run() {
+      try {
+        LOG.info("Running HBase liveness monitor");
+        TimelineEntityReader reader =
+            TimelineEntityReaderFactory.createMultipleEntitiesReader(
+                monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
+        reader.readEntities(monitorHBaseConf, monitorConn);
+
+        // on success, reset hbase down flag
+        if (hbaseDown.getAndSet(false)) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("HBase request succeeded, assuming HBase up");
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("Got failure attempting to read from timeline storage, " +
+            "assuming HBase down", e);
+        hbaseDown.getAndSet(true);
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org