You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2015/06/22 17:44:09 UTC
ambari git commit: AMBARI-12028. Add Better Error Handling for
accessing HDFS (Erik Bergenholtz via rlevas)
Repository: ambari
Updated Branches:
refs/heads/branch-2.1 c4d2e839b -> 5222188ac
AMBARI-12028. Add Better Error Handling for accessing HDFS (Erik Bergenholtz via rlevas)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5222188a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5222188a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5222188a
Branch: refs/heads/branch-2.1
Commit: 5222188ac7df2a3404da6b25ea01f9940cbd5cc4
Parents: c4d2e83
Author: Erik Bergenholtz <eb...@hortonworks.com>
Authored: Mon Jun 22 11:42:37 2015 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Mon Jun 22 11:42:37 2015 -0400
----------------------------------------------------------------------
contrib/views/hive/pom.xml | 7 +-
.../ui/hive-web/app/controllers/index.js | 4 +-
.../ui/hive-web/app/controllers/open-queries.js | 1 -
.../hive-web/app/controllers/visual-explain.js | 6 --
contrib/views/utils/pom.xml | 7 +-
.../apache/ambari/view/utils/hdfs/HdfsApi.java | 82 +++++++++++++++-----
6 files changed, 77 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive/pom.xml b/contrib/views/hive/pom.xml
index 2130c4a..afb80a3 100644
--- a/contrib/views/hive/pom.xml
+++ b/contrib/views/hive/pom.xml
@@ -194,10 +194,15 @@
<artifactId>commons-validator</artifactId>
<version>1.4.0</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
</dependencies>
<properties>
- <hadoop-version>2.6.0</hadoop-version>
+ <hadoop-version>2.7.0</hadoop-version>
<ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
<hive-version>1.0.0</hive-version>
<ambari.version>2.1.0-SNAPSHOT</ambari.version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
index b3cd127..72fd37c 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
@@ -109,7 +109,7 @@ export default Ember.Controller.extend({
return true;
}.property('model.isRunning', 'queryParams.@each.value'),
- parseQueryParams: function () {
+ currentQueryObserver: function () {
var query = this.get('openQueries.currentQuery.fileContent'),
param,
updatedParams = [],
@@ -129,6 +129,8 @@ export default Ember.Controller.extend({
});
currentParams.setObjects(updatedParams);
+
+ this.set('visualExplain.shouldChangeGraph', true);
}.observes('openQueries.currentQuery.fileContent'),
_executeQuery: function (referrer, shouldExplain, shouldGetVisualExplain) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
index 5ab46f6..611d8fe 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
@@ -196,7 +196,6 @@ export default Ember.ArrayController.extend({
var content = query.get('fileContent');
content = self.get('index').buildQuery(query);
content = self.get('index').bindQueryParams(content);
- content = self.get('index').prependQuerySettings(content);
//update query tab path with saved model id if its a new record
if (wasNew) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
index d6ae8c4..71e3c87 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
@@ -24,14 +24,8 @@ export default Ember.Controller.extend({
notifyService: Ember.inject.service(constants.namingConventions.notify),
index: Ember.inject.controller(),
- openQueries: Ember.inject.controller(),
-
verticesProgress: Ember.computed.alias('jobProgressService.currentJob.stages'),
- observeCurrentQuery: function () {
- this.set('shouldChangeGraph', true);
- }.observes('openQueries.currentQuery', 'openQueries.currentQuery.fileContent'),
-
actions: {
onTabOpen: function () {
var self = this;
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/utils/pom.xml b/contrib/views/utils/pom.xml
index 3e1ca4f..f930061 100644
--- a/contrib/views/utils/pom.xml
+++ b/contrib/views/utils/pom.xml
@@ -116,11 +116,16 @@
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
</dependencies>
<properties>
<ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
- <hadoop-version>2.6.0</hadoop-version>
+ <hadoop-version>2.7.0</hadoop-version>
</properties>
<build>
</build>
http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
index 80603b5..c7ae952 100644
--- a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
+++ b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.json.simple.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
@@ -42,7 +44,10 @@ import javax.security.auth.Subject;
* Hdfs Business Delegate
*/
public class HdfsApi {
-private final Configuration conf;
+ private final static Logger LOG =
+ LoggerFactory.getLogger(HdfsApi.class);
+
+ private final Configuration conf;
private final Map<String, String> authParams;
private FileSystem fs;
@@ -60,7 +65,7 @@ private final Configuration conf;
conf = configurationBuilder.buildConfig();
ugi = UserGroupInformation.createProxyUser(username, getProxyUser());
- fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ fs = execute(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return FileSystem.get(conf);
}
@@ -100,7 +105,7 @@ private final Configuration conf;
*/
public FileStatus[] listdir(final String path) throws FileNotFoundException,
IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<FileStatus[]>() {
+ return execute(new PrivilegedExceptionAction<FileStatus[]>() {
public FileStatus[] run() throws FileNotFoundException, Exception {
return fs.listStatus(new Path(path));
}
@@ -117,7 +122,7 @@ private final Configuration conf;
*/
public FileStatus getFileStatus(final String path) throws IOException,
FileNotFoundException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<FileStatus>() {
+ return execute(new PrivilegedExceptionAction<FileStatus>() {
public FileStatus run() throws FileNotFoundException, IOException {
return fs.getFileStatus(new Path(path));
}
@@ -133,7 +138,7 @@ private final Configuration conf;
*/
public boolean mkdir(final String path) throws IOException,
InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return fs.mkdirs(new Path(path));
}
@@ -150,7 +155,7 @@ private final Configuration conf;
*/
public boolean rename(final String src, final String dst) throws IOException,
InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return fs.rename(new Path(src), new Path(dst));
}
@@ -163,7 +168,7 @@ private final Configuration conf;
* @throws Exception
*/
public boolean trashEnabled() throws Exception {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException {
Trash tr = new Trash(fs, conf);
return tr.isEnabled();
@@ -177,7 +182,7 @@ private final Configuration conf;
* @throws Exception
*/
public Path getHomeDir() throws Exception {
- return ugi.doAs(new PrivilegedExceptionAction<Path>() {
+ return execute(new PrivilegedExceptionAction<Path>() {
public Path run() throws IOException {
return fs.getHomeDirectory();
}
@@ -190,7 +195,7 @@ private final Configuration conf;
* @throws Exception
*/
public synchronized FsStatus getStatus() throws Exception {
- return ugi.doAs(new PrivilegedExceptionAction<FsStatus>() {
+ return execute(new PrivilegedExceptionAction<FsStatus>() {
public FsStatus run() throws IOException {
return fs.getStatus();
}
@@ -203,7 +208,7 @@ private final Configuration conf;
* @throws Exception
*/
public Path getTrashDir() throws Exception {
- return ugi.doAs(new PrivilegedExceptionAction<Path>() {
+ return execute(new PrivilegedExceptionAction<Path>() {
public Path run() throws IOException {
TrashPolicy trashPolicy = TrashPolicy.getInstance(conf, fs,
fs.getHomeDirectory());
@@ -246,7 +251,7 @@ private final Configuration conf;
* @throws Exception
*/
public Void emptyTrash() throws Exception {
- return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ return execute(new PrivilegedExceptionAction<Void>() {
public Void run() throws IOException {
Trash tr = new Trash(fs, conf);
tr.expunge();
@@ -264,7 +269,7 @@ private final Configuration conf;
*/
public boolean moveToTrash(final String path) throws IOException,
InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return Trash.moveToAppropriateTrash(fs, new Path(path), conf);
}
@@ -281,7 +286,7 @@ private final Configuration conf;
*/
public boolean delete(final String path, final boolean recursive)
throws IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return fs.delete(new Path(path), recursive);
}
@@ -298,7 +303,7 @@ private final Configuration conf;
*/
public FSDataOutputStream create(final String path, final boolean overwrite)
throws IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
+ return execute(new PrivilegedExceptionAction<FSDataOutputStream>() {
public FSDataOutputStream run() throws Exception {
return fs.create(new Path(path), overwrite);
}
@@ -314,7 +319,7 @@ private final Configuration conf;
*/
public FSDataInputStream open(final String path) throws IOException,
InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<FSDataInputStream>() {
+ return execute(new PrivilegedExceptionAction<FSDataInputStream>() {
public FSDataInputStream run() throws Exception {
return fs.open(new Path(path));
}
@@ -330,7 +335,7 @@ private final Configuration conf;
*/
public boolean chmod(final String path, final String permissions) throws IOException,
InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
try {
fs.setPermission(new Path(path), FsPermission.valueOf(permissions));
@@ -349,8 +354,8 @@ private final Configuration conf;
* @throws java.io.IOException
* @throws InterruptedException
*/
- public synchronized void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException {
- boolean result = ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ public void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException {
+ boolean result = execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return FileUtil.copy(fs, new Path(src), fs, new Path(dest), false, conf);
}
@@ -361,8 +366,8 @@ private final Configuration conf;
}
}
- public synchronized boolean exists(final String newFilePath) throws IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ public boolean exists(final String newFilePath) throws IOException, InterruptedException {
+ return execute(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
return fs.exists(new Path(newFilePath));
}
@@ -370,6 +375,43 @@ private final Configuration conf;
}
/**
+ * Executes action on HDFS using doAs
+ * @param action strategy object
+ * @param <T> result type
+ * @return result of operation
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public <T> T execute(PrivilegedExceptionAction<T> action)
+ throws IOException, InterruptedException {
+ T result = null;
+
+ // Retry strategy applied here due to HDFS-1058. HDFS can throw random
+ // IOException about retrieving block from DN if concurrent read/write
+ // on specific file is performed (see details on HDFS-1058).
+ int tryNumber = 0;
+ boolean succeeded = false;
+ do {
+ tryNumber += 1;
+ try {
+ result = ugi.doAs(action);
+ succeeded = true;
+ } catch (IOException ex) {
+ if (!ex.getMessage().contains("Cannot obtain block length for")) {
+ throw ex;
+ }
+ if (tryNumber >= 3) {
+ throw ex;
+ }
+ LOG.info("HDFS threw 'IOException: Cannot obtain block length' exception. " +
+ "Retrying... Try #" + (tryNumber + 1));
+ Thread.sleep(1000); //retry after 1 second
+ }
+ } while (!succeeded);
+ return result;
+ }
+
+ /**
* Converts a Hadoop permission into a Unix permission symbolic representation
* (i.e. -rwxr--r--) or default if the permission is NULL.
*