You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2015/06/22 17:44:09 UTC

ambari git commit: AMBARI-12028. Add Better Error Handling for accessing HDFS (Erik Bergenholtz via rlevas)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 c4d2e839b -> 5222188ac


AMBARI-12028. Add Better Error Handling for accessing HDFS (Erik Bergenholtz via rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5222188a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5222188a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5222188a

Branch: refs/heads/branch-2.1
Commit: 5222188ac7df2a3404da6b25ea01f9940cbd5cc4
Parents: c4d2e83
Author: Erik Bergenholtz <eb...@hortonworks.com>
Authored: Mon Jun 22 11:42:37 2015 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Mon Jun 22 11:42:37 2015 -0400

----------------------------------------------------------------------
 contrib/views/hive/pom.xml                      |  7 +-
 .../ui/hive-web/app/controllers/index.js        |  4 +-
 .../ui/hive-web/app/controllers/open-queries.js |  1 -
 .../hive-web/app/controllers/visual-explain.js  |  6 --
 contrib/views/utils/pom.xml                     |  7 +-
 .../apache/ambari/view/utils/hdfs/HdfsApi.java  | 82 +++++++++++++++-----
 6 files changed, 77 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive/pom.xml b/contrib/views/hive/pom.xml
index 2130c4a..afb80a3 100644
--- a/contrib/views/hive/pom.xml
+++ b/contrib/views/hive/pom.xml
@@ -194,10 +194,15 @@
       <artifactId>commons-validator</artifactId>
       <version>1.4.0</version>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
   </dependencies>
 
   <properties>
-    <hadoop-version>2.6.0</hadoop-version>
+    <hadoop-version>2.7.0</hadoop-version>
     <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
     <hive-version>1.0.0</hive-version>
     <ambari.version>2.1.0-SNAPSHOT</ambari.version>

http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
index b3cd127..72fd37c 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js
@@ -109,7 +109,7 @@ export default Ember.Controller.extend({
     return true;
   }.property('model.isRunning', 'queryParams.@each.value'),
 
-  parseQueryParams: function () {
+  currentQueryObserver: function () {
     var query = this.get('openQueries.currentQuery.fileContent'),
         param,
         updatedParams = [],
@@ -129,6 +129,8 @@ export default Ember.Controller.extend({
     });
 
     currentParams.setObjects(updatedParams);
+
+    this.set('visualExplain.shouldChangeGraph', true);
   }.observes('openQueries.currentQuery.fileContent'),
 
   _executeQuery: function (referrer, shouldExplain, shouldGetVisualExplain) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
index 5ab46f6..611d8fe 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js
@@ -196,7 +196,6 @@ export default Ember.ArrayController.extend({
       var content = query.get('fileContent');
       content = self.get('index').buildQuery(query);
       content = self.get('index').bindQueryParams(content);
-      content = self.get('index').prependQuerySettings(content);
 
       //update query tab path with saved model id if its a new record
       if (wasNew) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
index d6ae8c4..71e3c87 100644
--- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
+++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js
@@ -24,14 +24,8 @@ export default Ember.Controller.extend({
   notifyService: Ember.inject.service(constants.namingConventions.notify),
 
   index: Ember.inject.controller(),
-  openQueries: Ember.inject.controller(),
-
   verticesProgress: Ember.computed.alias('jobProgressService.currentJob.stages'),
 
-  observeCurrentQuery: function () {
-    this.set('shouldChangeGraph', true);
-  }.observes('openQueries.currentQuery', 'openQueries.currentQuery.fileContent'),
-
   actions: {
     onTabOpen: function () {
       var self = this;

http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/utils/pom.xml b/contrib/views/utils/pom.xml
index 3e1ca4f..f930061 100644
--- a/contrib/views/utils/pom.xml
+++ b/contrib/views/utils/pom.xml
@@ -116,11 +116,16 @@
       <artifactId>gson</artifactId>
       <version>2.2.2</version>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
   </dependencies>
 
   <properties>
     <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
-    <hadoop-version>2.6.0</hadoop-version>
+    <hadoop-version>2.7.0</hadoop-version>
   </properties>
   <build>
   </build>

http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
index 80603b5..c7ae952 100644
--- a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
+++ b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.json.simple.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 
@@ -42,7 +44,10 @@ import javax.security.auth.Subject;
  * Hdfs Business Delegate
  */
 public class HdfsApi {
-private final Configuration conf;
+  private final static Logger LOG =
+      LoggerFactory.getLogger(HdfsApi.class);
+
+  private final Configuration conf;
   private final Map<String, String> authParams;
 
   private FileSystem fs;
@@ -60,7 +65,7 @@ private final Configuration conf;
     conf = configurationBuilder.buildConfig();
     ugi = UserGroupInformation.createProxyUser(username, getProxyUser());
 
-    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    fs = execute(new PrivilegedExceptionAction<FileSystem>() {
       public FileSystem run() throws IOException {
         return FileSystem.get(conf);
       }
@@ -100,7 +105,7 @@ private final Configuration conf;
    */
   public FileStatus[] listdir(final String path) throws FileNotFoundException,
       IOException, InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<FileStatus[]>() {
+    return execute(new PrivilegedExceptionAction<FileStatus[]>() {
       public FileStatus[] run() throws FileNotFoundException, Exception {
         return fs.listStatus(new Path(path));
       }
@@ -117,7 +122,7 @@ private final Configuration conf;
    */
   public FileStatus getFileStatus(final String path) throws IOException,
       FileNotFoundException, InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<FileStatus>() {
+    return execute(new PrivilegedExceptionAction<FileStatus>() {
       public FileStatus run() throws FileNotFoundException, IOException {
         return fs.getFileStatus(new Path(path));
       }
@@ -133,7 +138,7 @@ private final Configuration conf;
    */
   public boolean mkdir(final String path) throws IOException,
       InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return fs.mkdirs(new Path(path));
       }
@@ -150,7 +155,7 @@ private final Configuration conf;
    */
   public boolean rename(final String src, final String dst) throws IOException,
       InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return fs.rename(new Path(src), new Path(dst));
       }
@@ -163,7 +168,7 @@ private final Configuration conf;
    * @throws Exception
    */
   public boolean trashEnabled() throws Exception {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws IOException {
         Trash tr = new Trash(fs, conf);
         return tr.isEnabled();
@@ -177,7 +182,7 @@ private final Configuration conf;
    * @throws Exception
    */
   public Path getHomeDir() throws Exception {
-    return ugi.doAs(new PrivilegedExceptionAction<Path>() {
+    return execute(new PrivilegedExceptionAction<Path>() {
       public Path run() throws IOException {
         return fs.getHomeDirectory();
       }
@@ -190,7 +195,7 @@ private final Configuration conf;
    * @throws Exception
    */
   public synchronized FsStatus getStatus() throws Exception {
-    return ugi.doAs(new PrivilegedExceptionAction<FsStatus>() {
+    return execute(new PrivilegedExceptionAction<FsStatus>() {
       public FsStatus run() throws IOException {
         return fs.getStatus();
       }
@@ -203,7 +208,7 @@ private final Configuration conf;
    * @throws Exception
    */
   public Path getTrashDir() throws Exception {
-    return ugi.doAs(new PrivilegedExceptionAction<Path>() {
+    return execute(new PrivilegedExceptionAction<Path>() {
       public Path run() throws IOException {
         TrashPolicy trashPolicy = TrashPolicy.getInstance(conf, fs,
             fs.getHomeDirectory());
@@ -246,7 +251,7 @@ private final Configuration conf;
    * @throws Exception
    */
   public Void emptyTrash() throws Exception {
-    return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+    return execute(new PrivilegedExceptionAction<Void>() {
       public Void run() throws IOException {
         Trash tr = new Trash(fs, conf);
         tr.expunge();
@@ -264,7 +269,7 @@ private final Configuration conf;
    */
   public boolean moveToTrash(final String path) throws IOException,
       InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return Trash.moveToAppropriateTrash(fs, new Path(path), conf);
       }
@@ -281,7 +286,7 @@ private final Configuration conf;
    */
   public boolean delete(final String path, final boolean recursive)
       throws IOException, InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return fs.delete(new Path(path), recursive);
       }
@@ -298,7 +303,7 @@ private final Configuration conf;
    */
   public FSDataOutputStream create(final String path, final boolean overwrite)
       throws IOException, InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
+    return execute(new PrivilegedExceptionAction<FSDataOutputStream>() {
       public FSDataOutputStream run() throws Exception {
         return fs.create(new Path(path), overwrite);
       }
@@ -314,7 +319,7 @@ private final Configuration conf;
    */
   public FSDataInputStream open(final String path) throws IOException,
       InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<FSDataInputStream>() {
+    return execute(new PrivilegedExceptionAction<FSDataInputStream>() {
       public FSDataInputStream run() throws Exception {
         return fs.open(new Path(path));
       }
@@ -330,7 +335,7 @@ private final Configuration conf;
    */
   public boolean chmod(final String path, final String permissions) throws IOException,
       InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         try {
           fs.setPermission(new Path(path), FsPermission.valueOf(permissions));
@@ -349,8 +354,8 @@ private final Configuration conf;
    * @throws java.io.IOException
    * @throws InterruptedException
    */
-  public synchronized void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException {
-    boolean result = ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+  public void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException {
+    boolean result = execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return FileUtil.copy(fs, new Path(src), fs, new Path(dest), false, conf);
       }
@@ -361,8 +366,8 @@ private final Configuration conf;
     }
   }
 
-  public synchronized boolean exists(final String newFilePath) throws IOException, InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+  public boolean exists(final String newFilePath) throws IOException, InterruptedException {
+    return execute(new PrivilegedExceptionAction<Boolean>() {
       public Boolean run() throws Exception {
         return fs.exists(new Path(newFilePath));
       }
@@ -370,6 +375,43 @@ private final Configuration conf;
   }
 
   /**
+   * Executes action on HDFS using doAs
+   * @param action strategy object
+   * @param <T> result type
+   * @return result of operation
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public <T> T execute(PrivilegedExceptionAction<T> action)
+      throws IOException, InterruptedException {
+    T result = null;
+
+    // Retry strategy applied here due to HDFS-1058. HDFS can throw random
+    // IOException about retrieving block from DN if concurrent read/write
+    // on specific file is performed (see details on HDFS-1058).
+    int tryNumber = 0;
+    boolean succeeded = false;
+    do {
+      tryNumber += 1;
+      try {
+        result = ugi.doAs(action);
+        succeeded = true;
+      } catch (IOException ex) {
+        if (!ex.getMessage().contains("Cannot obtain block length for")) {
+          throw ex;
+        }
+        if (tryNumber >= 3) {
+          throw ex;
+        }
+        LOG.info("HDFS threw 'IOException: Cannot obtain block length' exception. " +
+            "Retrying... Try #" + (tryNumber + 1));
+        Thread.sleep(1000);  //retry after 1 second
+      }
+    } while (!succeeded);
+    return result;
+  }
+
+  /**
    * Converts a Hadoop permission into a Unix permission symbolic representation
    * (i.e. -rwxr--r--) or default if the permission is NULL.
    *