You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/01 00:21:15 UTC
falcon git commit: FALCON-2057 HiveDR not working with multiple users
and same DB
Repository: falcon
Updated Branches:
refs/heads/master a78795c3d -> d0bc18860
FALCON-2057 HiveDR not working with multiple users and same DB
Author: bvellanki <bv...@hortonworks.com>
Reviewers: "yzheng-hortonworks <yz...@hortonworks.com>, Sowmya Ramesh <sr...@hortonworks.com>, Venkat Ranganathan <ve...@hortonworks.com>"
Closes #203 from bvellanki/FALCON-2057
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d0bc1886
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d0bc1886
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d0bc1886
Branch: refs/heads/master
Commit: d0bc188601566dbf76555a115c7ea9c68ca13909
Parents: a78795c
Author: bvellanki <bv...@hortonworks.com>
Authored: Thu Jun 30 17:21:06 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Thu Jun 30 17:21:06 2016 -0700
----------------------------------------------------------------------
.../falcon/hive/util/HiveDRStatusStore.java | 23 +++++++++++++++++---
1 file changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0bc1886/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
index 900afe8..76eda87 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -71,6 +72,8 @@ public class HiveDRStatusStore extends DRStatusStore {
Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
FileUtils.validatePath(fileSystem, basePath);
+ // Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs.
+ // BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore.
Path storePath = new Path(DEFAULT_STORE_PATH);
if (!fileSystem.exists(storePath)) {
if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
@@ -163,10 +166,11 @@ public class HiveDRStatusStore extends DRStatusStore {
private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
String database) throws HiveReplicationException{
DBReplicationStatus dbReplicationStatus = null;
+ Path statusDbDirPath = getStatusDbDirPath(database);
Path statusDirPath = getStatusDirPath(database, jobName);
+
// check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
// if yes, use md5 of the same for dir names. prefer to use actual db names for readability.
-
try {
if (fileSystem.exists(statusDirPath)) {
dbReplicationStatus = readStatusFile(statusDirPath);
@@ -176,6 +180,15 @@ public class HiveDRStatusStore extends DRStatusStore {
ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
database, null, ReplicationStatus.Status.INIT, -1);
dbReplicationStatus = new DBReplicationStatus(initDbStatus);
+
+ // Create parent dir first with default status store permissions. FALCON-2057
+ if (!fileSystem.exists(statusDbDirPath)) {
+ if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
+ String error = "mkdir failed for " + statusDbDirPath.toString();
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+ }
if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDirPath.toString();
LOG.error(error);
@@ -197,7 +210,11 @@ public class HiveDRStatusStore extends DRStatusStore {
}
public Path getStatusDirPath(String database, String jobName) {
- return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName);
+ return new Path(getStatusDbDirPath(database), jobName);
+ }
+
+ public Path getStatusDbDirPath(String dbName) {
+ return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase());
}
private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
@@ -271,7 +288,7 @@ public class HiveDRStatusStore extends DRStatusStore {
public void checkForReplicationConflict(String newSource, String jobName,
String database, String table) throws HiveReplicationException {
try {
- Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json");
+ Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json");
FileStatus[] files = fileSystem.globStatus(globPath);
for(FileStatus file : files) {
DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(