You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:09:32 UTC

[20/24] carbondata git commit: [CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry

[CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry

This closes #1147


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/285ce72d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/285ce72d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/285ce72d

Branch: refs/heads/streaming_ingest
Commit: 285ce72d4c9b3364bbdc454f4b6b331b3caa42db
Parents: 8b31f09
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 8 15:46:25 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jul 11 18:00:18 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +
 .../core/datastore/impl/FileFactory.java        |  21 ++
 .../AtomicFileOperationsImpl.java               |   5 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../core/util/path/HDFSLeaseUtils.java          | 215 +++++++++++++++++++
 .../core/writer/CarbonDictionaryWriterImpl.java |  20 +-
 .../carbondata/core/writer/ThriftWriter.java    |   2 +-
 7 files changed, 266 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 208bab8..8110abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1287,6 +1287,13 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
 
+  @CarbonProperty
+  public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
+      "carbon.lease.recovery.retry.count";
+  @CarbonProperty
+  public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL =
+      "carbon.lease.recovery.retry.interval";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7acd6b1..2a35ab3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -518,4 +518,25 @@ public final class FileFactory {
     }
   }
 
+  /**
+   * This method will create the path object for a given file
+   *
+   * @param filePath
+   * @return
+   */
+  public static Path getPath(String filePath) {
+    return new Path(filePath);
+  }
+
+  /**
+   * This method will return the filesystem instance
+   *
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public static FileSystem getFileSystem(Path path) throws IOException {
+    return path.getFileSystem(configuration);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index befc76e..61690ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 public class AtomicFileOperationsImpl implements AtomicFileOperations {
 
@@ -67,10 +68,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
   @Override public void close() throws IOException {
 
     if (null != dataOutStream) {
-      dataOutStream.close();
-
+      CarbonUtil.closeStream(dataOutStream);
       CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-
       if (!tempFile.renameForce(filePath)) {
         throw new IOException("temporary file renaming failed, src="
             + tempFile.getPath() + ", dest=" + filePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..06b2a61 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -128,7 +128,7 @@ public final class CarbonUtil {
         try {
           closeStream(stream);
         } catch (IOException e) {
-          LOGGER.error("Error while closing stream:" + e);
+          LOGGER.error(e, "Error while closing stream:" + e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
new file mode 100644
index 0000000..c72c322
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+
+/**
+ * Implementation for HDFS utility methods
+ */
+public class HDFSLeaseUtils {
+
+  private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN = 1;
+  private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX = 50;
+  private static final String CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT = "5";
+  private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN = 1000;
+  private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX = 10000;
+  private static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT = "1000";
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName());
+
+  /**
+   * This method will validate whether the exception thrown if for lease recovery from HDFS
+   *
+   * @param message
+   * @return
+   */
+  public static boolean checkExceptionMessageForLeaseRecovery(String message) {
+    // depending on the scenario few more cases can be added for validating lease recovery exception
+    if (null != message && message.contains("Failed to APPEND_FILE")) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This method will make attempts to recover lease on a file using the
+   * distributed file system utility.
+   *
+   * @param filePath
+   * @return
+   * @throws IOException
+   */
+  public static boolean recoverFileLease(String filePath) throws IOException {
+    LOGGER.info("Trying to recover lease on file: " + filePath);
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    switch (fileType) {
+      case ALLUXIO:
+      case HDFS:
+        Path path = FileFactory.getPath(filePath);
+        FileSystem fs = FileFactory.getFileSystem(path);
+        return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
+      case VIEWFS:
+        path = FileFactory.getPath(filePath);
+        fs = FileFactory.getFileSystem(path);
+        ViewFileSystem viewFileSystem = (ViewFileSystem) fs;
+        Path targetFileSystemPath = viewFileSystem.resolvePath(path);
+        FileSystem targetFileSystem = FileFactory.getFileSystem(targetFileSystemPath);
+        if (targetFileSystem instanceof DistributedFileSystem) {
+          return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) targetFileSystem);
+        } else {
+          LOGGER.error(
+              "Invalid file type. Lease recovery is not supported on filesystem with file: "
+                  + filePath);
+          return false;
+        }
+      default:
+        LOGGER.error("Invalid file type. Lease recovery is not supported on filesystem with file: "
+            + filePath);
+        return false;
+    }
+  }
+
+  /**
+   * Recovers lease on a file
+   *
+   * @param filePath
+   * @param path
+   * @param fs
+   * @return
+   * @throws IOException
+   */
+  private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs)
+      throws IOException {
+    DistributedFileSystem dfs = fs;
+    int maxAttempts = getLeaseRecoveryRetryCount();
+    int retryInterval = getLeaseRecoveryRetryInterval();
+    boolean leaseRecovered = false;
+    IOException ioException = null;
+    for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
+      try {
+        leaseRecovered = dfs.recoverLease(path);
+        if (!leaseRecovered) {
+          try {
+            LOGGER.info(
+                "Failed to recover lease after attempt " + retryCount + " . Will try again after "
+                    + retryInterval + " ms...");
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException e) {
+            LOGGER.error(e,
+                "Interrupted exception occurred while recovering lease for file : " + filePath);
+          }
+        }
+      } catch (IOException e) {
+        if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+          LOGGER.error("The given file does not exist at path " + filePath);
+          throw e;
+        } else if (e instanceof FileNotFoundException) {
+          LOGGER.error("The given file does not exist at path " + filePath);
+          throw e;
+        } else {
+          LOGGER.error("Recover lease threw exception : " + e.getMessage());
+          ioException = e;
+        }
+      }
+      LOGGER.info("Retrying again after interval of " + retryInterval + " ms...");
+    }
+    if (leaseRecovered) {
+      LOGGER.info("Successfully able to recover lease on file: " + filePath);
+      return true;
+    } else {
+      LOGGER.error(
+          "Failed to recover lease on file: " + filePath + " after retrying for " + maxAttempts
+              + " at an interval of " + retryInterval);
+      if (null != ioException) {
+        throw ioException;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private static int getLeaseRecoveryRetryCount() {
+    String retryMaxAttempts = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
+            CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+    int retryCount = 0;
+    try {
+      retryCount = Integer.parseInt(retryMaxAttempts);
+      if (retryCount < CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN
+          || retryCount > CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) {
+        retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+        LOGGER.warn(
+            "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+                + " is not in allowed range. Allowed range is >="
+                + CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + " and <="
+                + CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX + ". Therefore considering default value: "
+                + retryCount);
+      }
+    } catch (NumberFormatException ne) {
+      retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+      LOGGER.warn("value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+          + " is incorrect. Therefore considering default value: " + retryCount);
+    }
+    return retryCount;
+  }
+
+  private static int getLeaseRecoveryRetryInterval() {
+    String retryMaxAttempts = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL,
+            CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+    int retryCount = 0;
+    try {
+      retryCount = Integer.parseInt(retryMaxAttempts);
+      if (retryCount < CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN
+          || retryCount > CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) {
+        retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+        LOGGER.warn(
+            "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+                + " is not in allowed range. Allowed range is >="
+                + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <="
+                + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX
+                + ". Therefore considering default value (ms): " + retryCount);
+      }
+    } catch (NumberFormatException ne) {
+      retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+      LOGGER.warn(
+          "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+              + " is incorrect. Therefore considering default value (ms): " + retryCount);
+    }
+    return retryCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
index 9de41e1..64ff202 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.HDFSLeaseUtils;
 import org.apache.carbondata.format.ColumnDictionaryChunk;
 import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 
@@ -359,7 +360,24 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
     // create thrift writer instance
     dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
     // open the file stream
-    dictionaryThriftWriter.open();
+    try {
+      dictionaryThriftWriter.open();
+    } catch (IOException e) {
+      // Cases to handle
+      // 1. Handle File lease recovery
+      if (HDFSLeaseUtils.checkExceptionMessageForLeaseRecovery(e.getMessage())) {
+        LOGGER.error(e, "Lease recovery exception encountered for file: " + dictionaryFile);
+        boolean leaseRecovered = HDFSLeaseUtils.recoverFileLease(dictionaryFile);
+        if (leaseRecovered) {
+          // try to open output stream again after recovering the lease on file
+          dictionaryThriftWriter.open();
+        } else {
+          throw e;
+        }
+      } else {
+        throw e;
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
index 9bf549d..d7b1a0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -136,7 +136,7 @@ public class ThriftWriter {
    */
   public void close() throws IOException {
     closeAtomicFileWriter();
-    CarbonUtil.closeStreams(dataOutputStream);
+    CarbonUtil.closeStream(dataOutputStream);
   }
 
   /**