You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:17 UTC
[17/56] [abbrv] hadoop git commit: YARN-8302. ATS v2 should handle
HBase connection issue properly. Contributed by Billie Rinaldi.
YARN-8302. ATS v2 should handle HBase connection issue properly. Contributed by Billie Rinaldi.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba683204
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba683204
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba683204
Branch: refs/heads/HDFS-12943
Commit: ba683204498c97654be4727ab9e128c433a45498
Parents: 0247cb6
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Fri Jul 6 15:19:01 2018 -0700
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Jul 6 15:19:01 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 7 +
.../storage/TestTimelineReaderHBaseDown.java | 220 +++++++++++++++++++
.../storage/HBaseTimelineReaderImpl.java | 93 ++++++++
3 files changed, 320 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5842d64..9156c2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3659,6 +3659,13 @@ public class YarnConfiguration extends Configuration {
DEFAULT_TIMELINE_SERVICE_READER_WEBAPP_HTTPS_ADDRESS =
DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS;
+ @Private
+ public static final String
+ TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS =
+ TIMELINE_SERVICE_READER_PREFIX + "storage-monitor.interval-ms";
+ public static final long
+ DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS = 60 * 1000;
+
/**
* Marked collector properties as Private since it run as auxillary service.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
new file mode 100644
index 0000000..786f529
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS;
+
+public class TestTimelineReaderHBaseDown {
+
+ @Test(timeout=300000)
+ public void testTimelineReaderHBaseUp() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ configure(util);
+ try {
+ util.startMiniCluster();
+ DataGeneratorForTest.createSchema(util.getConfiguration());
+ DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+ TimelineReaderServer server = getTimelineReaderServer();
+ server.init(util.getConfiguration());
+ HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+ server.start();
+ checkQuery(htr);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ @Test(timeout=300000)
+ public void testTimelineReaderInitWhenHBaseIsDown() throws
+ TimeoutException, InterruptedException {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ configure(util);
+ TimelineReaderServer server = getTimelineReaderServer();
+
+ // init timeline reader when hbase is not running
+ server.init(util.getConfiguration());
+ HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+ server.start();
+ waitForHBaseDown(htr);
+ }
+
+ @Test(timeout=300000)
+ public void testTimelineReaderDetectsHBaseDown() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ configure(util);
+
+ try {
+ // start minicluster
+ util.startMiniCluster();
+ DataGeneratorForTest.createSchema(util.getConfiguration());
+ DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+ // init timeline reader
+ TimelineReaderServer server = getTimelineReaderServer();
+ server.init(util.getConfiguration());
+ HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+ // stop hbase after timeline reader init
+ util.shutdownMiniHBaseCluster();
+
+ // start server and check that it detects hbase is down
+ server.start();
+ waitForHBaseDown(htr);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ @Test(timeout=300000)
+ public void testTimelineReaderDetectsZooKeeperDown() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ configure(util);
+
+ try {
+ // start minicluster
+ util.startMiniCluster();
+ DataGeneratorForTest.createSchema(util.getConfiguration());
+ DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+ // init timeline reader
+ TimelineReaderServer server = getTimelineReaderServer();
+ server.init(util.getConfiguration());
+ HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+ // stop hbase and zookeeper after timeline reader init
+ util.shutdownMiniCluster();
+
+ // start server and check that it detects hbase is down
+ server.start();
+ waitForHBaseDown(htr);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ @Test(timeout=300000)
+ public void testTimelineReaderRecoversAfterHBaseReturns() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ configure(util);
+
+ try {
+ // start minicluster
+ util.startMiniCluster();
+ DataGeneratorForTest.createSchema(util.getConfiguration());
+ DataGeneratorForTest.loadApps(util, System.currentTimeMillis());
+
+ // init timeline reader
+ TimelineReaderServer server = getTimelineReaderServer();
+ server.init(util.getConfiguration());
+ HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server);
+
+ // stop hbase after timeline reader init
+ util.shutdownMiniHBaseCluster();
+
+ // start server and check that it detects hbase is down
+ server.start();
+ waitForHBaseDown(htr);
+
+ util.startMiniHBaseCluster(1, 1);
+ GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws
+ TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000);
+ try {
+ checkQuery(htr);
+ Assert.fail("Query should fail when HBase is down");
+ } catch (IOException e) {
+ Assert.assertEquals("HBase is down", e.getMessage());
+ }
+ }
+
+ private static void checkQuery(HBaseTimelineReaderImpl htr) throws
+ IOException {
+ TimelineReaderContext context =
+ new TimelineReaderContext(YarnConfiguration.DEFAULT_RM_CLUSTER_ID,
+ null, null, null, null, TimelineEntityType
+ .YARN_FLOW_ACTIVITY.toString(), null, null);
+ Set<TimelineEntity> entities = htr.getEntities(context, MONITOR_FILTERS,
+ DATA_TO_RETRIEVE);
+ }
+
+ private static void configure(HBaseTestingUtility util) {
+ Configuration config = util.getConfiguration();
+ config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ config.set(YarnConfiguration.TIMELINE_SERVICE_READER_WEBAPP_ADDRESS,
+ "localhost:0");
+ config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+ "org.apache.hadoop.yarn.server.timelineservice.storage."
+ + "HBaseTimelineReaderImpl");
+ config.setInt("hfile.format.version", 3);
+ config.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000);
+ }
+
+ private static TimelineReaderServer getTimelineReaderServer() {
+ return new TimelineReaderServer() {
+ @Override
+ protected void addFilters(Configuration conf) {
+ // The parent code uses hadoop-common jar from this version of
+ // Hadoop, but the tests are using hadoop-common jar from
+ // ${hbase-compatible-hadoop.version}. This version uses Jetty 9
+ // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there
+ // are many differences, including classnames and packages.
+ // We do nothing here, so that we don't cause a NoSuchMethodError or
+ // NoClassDefFoundError.
+ // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3,
+ // we should be able to remove this @Override.
+ }
+ };
+ }
+
+ private static HBaseTimelineReaderImpl getHBaseTimelineReaderImpl(
+ TimelineReaderServer server) {
+ for (Service s: server.getServices()) {
+ if (s instanceof HBaseTimelineReaderImpl) {
+ return (HBaseTimelineReaderImpl) s;
+ }
+ }
+ throw new IllegalStateException("Couldn't find HBaseTimelineReaderImpl");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba683204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 1ebfab2..fadfd14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -47,6 +53,12 @@ public class HBaseTimelineReaderImpl
private Configuration hbaseConf = null;
private Connection conn;
+ private Configuration monitorHBaseConf = null;
+ private Connection monitorConn;
+ private ScheduledExecutorService monitorExecutorService;
+ private TimelineReaderContext monitorContext;
+ private long monitorInterval;
+ private AtomicBoolean hbaseDown = new AtomicBoolean();
public HBaseTimelineReaderImpl() {
super(HBaseTimelineReaderImpl.class.getName());
@@ -55,22 +67,72 @@ public class HBaseTimelineReaderImpl
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
+
+ String clusterId = conf.get(
+ YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+ monitorContext =
+ new TimelineReaderContext(clusterId, null, null, null, null,
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
+ monitorInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
+
+ monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ monitorHBaseConf.setInt("hbase.client.retries.number", 3);
+ monitorHBaseConf.setLong("hbase.client.pause", 1000);
+ monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
+ monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
+ monitorInterval);
+ monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
+ monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
+
+ monitorExecutorService = Executors.newScheduledThreadPool(1);
+
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
}
@Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ LOG.info("Scheduling HBase liveness monitor at interval {}",
+ monitorInterval);
+ monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
+ monitorInterval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
protected void serviceStop() throws Exception {
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();
}
+ if (monitorExecutorService != null) {
+ monitorExecutorService.shutdownNow();
+ if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.warn("failed to stop the monitir task in time. " +
+ "will still proceed to close the monitor.");
+ }
+ }
+ monitorConn.close();
super.serviceStop();
}
+ private void checkHBaseDown() throws IOException {
+ if (hbaseDown.get()) {
+ throw new IOException("HBase is down");
+ }
+ }
+
+ public boolean isHBaseDown() {
+ return hbaseDown.get();
+ }
+
@Override
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
+ checkHBaseDown();
TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(context,
dataToRetrieve);
@@ -81,6 +143,7 @@ public class HBaseTimelineReaderImpl
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
+ checkHBaseDown();
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
filters, dataToRetrieve);
@@ -90,7 +153,37 @@ public class HBaseTimelineReaderImpl
@Override
public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException {
+ checkHBaseDown();
EntityTypeReader reader = new EntityTypeReader(context);
return reader.readEntityTypes(hbaseConf, conn);
}
+
+ protected static final TimelineEntityFilters MONITOR_FILTERS =
+ new TimelineEntityFilters.Builder().entityLimit(1L).build();
+ protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
+ new TimelineDataToRetrieve(null, null, null, null, null, null);
+
+ private class HBaseMonitor implements Runnable {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Running HBase liveness monitor");
+ TimelineEntityReader reader =
+ TimelineEntityReaderFactory.createMultipleEntitiesReader(
+ monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
+ reader.readEntities(monitorHBaseConf, monitorConn);
+
+ // on success, reset hbase down flag
+ if (hbaseDown.getAndSet(false)) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("HBase request succeeded, assuming HBase up");
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Got failure attempting to read from timeline storage, " +
+ "assuming HBase down", e);
+ hbaseDown.getAndSet(true);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org