You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:09:32 UTC
[20/24] carbondata git commit: [CARBONDATA-1277] Dictionary
generation failure due to hdfs lease expiry
[CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry
This closes #1147
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/285ce72d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/285ce72d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/285ce72d
Branch: refs/heads/streaming_ingest
Commit: 285ce72d4c9b3364bbdc454f4b6b331b3caa42db
Parents: 8b31f09
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 8 15:46:25 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jul 11 18:00:18 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 7 +
.../core/datastore/impl/FileFactory.java | 21 ++
.../AtomicFileOperationsImpl.java | 5 +-
.../apache/carbondata/core/util/CarbonUtil.java | 2 +-
.../core/util/path/HDFSLeaseUtils.java | 215 +++++++++++++++++++
.../core/writer/CarbonDictionaryWriterImpl.java | 20 +-
.../carbondata/core/writer/ThriftWriter.java | 2 +-
7 files changed, 266 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 208bab8..8110abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1287,6 +1287,13 @@ public final class CarbonCommonConstants {
public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
+ @CarbonProperty
+ public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
+ "carbon.lease.recovery.retry.count";
+ @CarbonProperty
+ public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL =
+ "carbon.lease.recovery.retry.interval";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7acd6b1..2a35ab3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -518,4 +518,25 @@ public final class FileFactory {
}
}
+ /**
+ * This method will create the path object for a given file
+ *
+ * @param filePath
+ * @return
+ */
+ public static Path getPath(String filePath) {
+ return new Path(filePath);
+ }
+
+ /**
+ * This method will return the filesystem instance
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public static FileSystem getFileSystem(Path path) throws IOException {
+ return path.getFileSystem(configuration);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index befc76e..61690ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
public class AtomicFileOperationsImpl implements AtomicFileOperations {
@@ -67,10 +68,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
@Override public void close() throws IOException {
if (null != dataOutStream) {
- dataOutStream.close();
-
+ CarbonUtil.closeStream(dataOutStream);
CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-
if (!tempFile.renameForce(filePath)) {
throw new IOException("temporary file renaming failed, src="
+ tempFile.getPath() + ", dest=" + filePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..06b2a61 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -128,7 +128,7 @@ public final class CarbonUtil {
try {
closeStream(stream);
} catch (IOException e) {
- LOGGER.error("Error while closing stream:" + e);
+ LOGGER.error(e, "Error while closing stream:" + e);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
new file mode 100644
index 0000000..c72c322
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+
+/**
+ * Implementation for HDFS utility methods
+ */
+public class HDFSLeaseUtils {
+
+ private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN = 1;
+ private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX = 50;
+ private static final String CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT = "5";
+ private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN = 1000;
+ private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX = 10000;
+ private static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT = "1000";
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName());
+
+ /**
+ * This method will validate whether the exception thrown if for lease recovery from HDFS
+ *
+ * @param message
+ * @return
+ */
+ public static boolean checkExceptionMessageForLeaseRecovery(String message) {
+ // depending on the scenario few more cases can be added for validating lease recovery exception
+ if (null != message && message.contains("Failed to APPEND_FILE")) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This method will make attempts to recover lease on a file using the
+ * distributed file system utility.
+ *
+ * @param filePath
+ * @return
+ * @throws IOException
+ */
+ public static boolean recoverFileLease(String filePath) throws IOException {
+ LOGGER.info("Trying to recover lease on file: " + filePath);
+ FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+ switch (fileType) {
+ case ALLUXIO:
+ case HDFS:
+ Path path = FileFactory.getPath(filePath);
+ FileSystem fs = FileFactory.getFileSystem(path);
+ return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
+ case VIEWFS:
+ path = FileFactory.getPath(filePath);
+ fs = FileFactory.getFileSystem(path);
+ ViewFileSystem viewFileSystem = (ViewFileSystem) fs;
+ Path targetFileSystemPath = viewFileSystem.resolvePath(path);
+ FileSystem targetFileSystem = FileFactory.getFileSystem(targetFileSystemPath);
+ if (targetFileSystem instanceof DistributedFileSystem) {
+ return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) targetFileSystem);
+ } else {
+ LOGGER.error(
+ "Invalid file type. Lease recovery is not supported on filesystem with file: "
+ + filePath);
+ return false;
+ }
+ default:
+ LOGGER.error("Invalid file type. Lease recovery is not supported on filesystem with file: "
+ + filePath);
+ return false;
+ }
+ }
+
+ /**
+ * Recovers lease on a file
+ *
+ * @param filePath
+ * @param path
+ * @param fs
+ * @return
+ * @throws IOException
+ */
+ private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs)
+ throws IOException {
+ DistributedFileSystem dfs = fs;
+ int maxAttempts = getLeaseRecoveryRetryCount();
+ int retryInterval = getLeaseRecoveryRetryInterval();
+ boolean leaseRecovered = false;
+ IOException ioException = null;
+ for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
+ try {
+ leaseRecovered = dfs.recoverLease(path);
+ if (!leaseRecovered) {
+ try {
+ LOGGER.info(
+ "Failed to recover lease after attempt " + retryCount + " . Will try again after "
+ + retryInterval + " ms...");
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e) {
+ LOGGER.error(e,
+ "Interrupted exception occurred while recovering lease for file : " + filePath);
+ }
+ }
+ } catch (IOException e) {
+ if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+ LOGGER.error("The given file does not exist at path " + filePath);
+ throw e;
+ } else if (e instanceof FileNotFoundException) {
+ LOGGER.error("The given file does not exist at path " + filePath);
+ throw e;
+ } else {
+ LOGGER.error("Recover lease threw exception : " + e.getMessage());
+ ioException = e;
+ }
+ }
+ LOGGER.info("Retrying again after interval of " + retryInterval + " ms...");
+ }
+ if (leaseRecovered) {
+ LOGGER.info("Successfully able to recover lease on file: " + filePath);
+ return true;
+ } else {
+ LOGGER.error(
+ "Failed to recover lease on file: " + filePath + " after retrying for " + maxAttempts
+ + " at an interval of " + retryInterval);
+ if (null != ioException) {
+ throw ioException;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private static int getLeaseRecoveryRetryCount() {
+ String retryMaxAttempts = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
+ CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ int retryCount = 0;
+ try {
+ retryCount = Integer.parseInt(retryMaxAttempts);
+ if (retryCount < CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN
+ || retryCount > CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+ + " is not in allowed range. Allowed range is >="
+ + CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + " and <="
+ + CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX + ". Therefore considering default value: "
+ + retryCount);
+ }
+ } catch (NumberFormatException ne) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ LOGGER.warn("value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+ + " is incorrect. Therefore considering default value: " + retryCount);
+ }
+ return retryCount;
+ }
+
+ private static int getLeaseRecoveryRetryInterval() {
+ String retryMaxAttempts = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL,
+ CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ int retryCount = 0;
+ try {
+ retryCount = Integer.parseInt(retryMaxAttempts);
+ if (retryCount < CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN
+ || retryCount > CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+ + " is not in allowed range. Allowed range is >="
+ + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <="
+ + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX
+ + ". Therefore considering default value (ms): " + retryCount);
+ }
+ } catch (NumberFormatException ne) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+ + " is incorrect. Therefore considering default value (ms): " + retryCount);
+ }
+ return retryCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
index 9de41e1..64ff202 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.service.CarbonCommonFactory;
import org.apache.carbondata.core.service.PathService;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.HDFSLeaseUtils;
import org.apache.carbondata.format.ColumnDictionaryChunk;
import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
@@ -359,7 +360,24 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
// create thrift writer instance
dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
// open the file stream
- dictionaryThriftWriter.open();
+ try {
+ dictionaryThriftWriter.open();
+ } catch (IOException e) {
+ // Cases to handle
+ // 1. Handle File lease recovery
+ if (HDFSLeaseUtils.checkExceptionMessageForLeaseRecovery(e.getMessage())) {
+ LOGGER.error(e, "Lease recovery exception encountered for file: " + dictionaryFile);
+ boolean leaseRecovered = HDFSLeaseUtils.recoverFileLease(dictionaryFile);
+ if (leaseRecovered) {
+ // try to open output stream again after recovering the lease on file
+ dictionaryThriftWriter.open();
+ } else {
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
index 9bf549d..d7b1a0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -136,7 +136,7 @@ public class ThriftWriter {
*/
public void close() throws IOException {
closeAtomicFileWriter();
- CarbonUtil.closeStreams(dataOutputStream);
+ CarbonUtil.closeStream(dataOutputStream);
}
/**