You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/11 10:20:04 UTC

[4/5] falcon git commit: FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat Ranganathan.

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
new file mode 100644
index 0000000..98449f0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Copy committer class.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CopyCommitter.class);
+
+    /**
+     * Create a file output committer.
+     *
+     * @param outputPath the job's output path, or null if you want the output
+     *                   committer to act as a noop.
+     * @param context    the task's context
+     * @throws java.io.IOException
+     */
+    public CopyCommitter(Path outputPath,
+                         TaskAttemptContext context) throws IOException {
+        super(outputPath, context);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+        Configuration conf = jobContext.getConfiguration();
+
+        try {
+            super.commitJob(jobContext);
+        } finally {
+            cleanup(conf);
+        }
+    }
+
+    private void cleanup(Configuration conf) {
+        // clean up staging and other data
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
new file mode 100644
index 0000000..5eb8acb
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.util.EventUtils;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Map class for Hive DR.
+ */
+public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
+    private EventUtils eventUtils;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        eventUtils = new EventUtils(context.getConfiguration());
+        eventUtils.initializeFS();
+        try {
+            eventUtils.setupConnection();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value,
+                       Context context) throws IOException, InterruptedException {
+        LOG.debug("Processing Event value: {}", value.toString());
+
+        try {
+            eventUtils.processEvents(value.toString());
+        } catch (Exception e) {
+            LOG.error("Exception in processing events:", e);
+            throw new IOException(e);
+        } finally {
+            cleanup(context);
+        }
+        List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();
+        if (replicationStatusList != null && !replicationStatusList.isEmpty()) {
+            for (ReplicationStatus rs : replicationStatusList) {
+                context.write(new Text(rs.getJobName()), new Text(rs.toString()));
+            }
+        }
+    }
+
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        LOG.info("Invoking cleanup process");
+        super.cleanup(context);
+        try {
+            if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
+                    .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
+                eventUtils.cleanEventsDirectory();
+            }
+        } catch (IOException e) {
+            LOG.error("Cleaning up of events directories failed", e);
+        } finally {
+            try {
+                eventUtils.closeConnection();
+            } catch (SQLException e) {
+                LOG.error("Closing the connections failed", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
new file mode 100644
index 0000000..50cb4b2
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.FileUtils;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Reducer class for Hive DR.
+ */
+public class CopyReducer extends Reducer<Text, Text, Text, Text> {
+    private DRStatusStore hiveDRStore;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        Configuration conf = context.getConfiguration();
+        FileSystem fs= FileSystem.get(FileUtils.getConfiguration(
+                conf.get(HiveDRArgs.TARGET_NN.getName()),
+                conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
+        hiveDRStore = new HiveDRStatusStore(fs);
+    }
+
+    private List<ReplicationStatus> sortStatusList(List<ReplicationStatus> replStatusList) {
+        Collections.sort(replStatusList, new Comparator<ReplicationStatus>() {
+            @Override
+            public int compare(ReplicationStatus r1, ReplicationStatus r2) {
+                return (int) (r1.getEventId() - r2.getEventId());
+            }
+        });
+        return replStatusList;
+    }
+
+    @Override
+    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
+        ReplicationStatus rs;
+        try {
+            for (Text value : values) {
+                String[] fields = (value.toString()).split("\t");
+                rs = new ReplicationStatus(fields[0], fields[1], fields[2], fields[3], fields[4],
+                        ReplicationStatus.Status.valueOf(fields[5]), Long.parseLong(fields[6]));
+                replStatusList.add(rs);
+            }
+
+            hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList));
+        } catch (HiveReplicationException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
new file mode 100644
index 0000000..6dceb8e
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to store replication status of a DB and it's tables.
+ */
+public class DBReplicationStatus {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DBReplicationStatus.class);
+    private static final String DB_STATUS = "db_status";
+    private static final String TABLE_STATUS = "table_status";
+
+    private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>();
+    private ReplicationStatus databaseStatus;
+
+    public DBReplicationStatus(ReplicationStatus dbStatus) throws HiveReplicationException {
+        setDatabaseStatus(dbStatus);
+    }
+
+    public DBReplicationStatus(ReplicationStatus dbStatus,
+                               Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException {
+        /*
+        The order of set method calls is important to ensure tables that do not belong to same db
+        are not added to this DBReplicationStatus
+         */
+        setDatabaseStatus(dbStatus);
+        setTableStatuses(tableStatuses);
+    }
+
+    // Serialize
+    public String toJsonString() throws HiveReplicationException {
+        JSONObject retObject = new JSONObject();
+        JSONObject tableStatus = new JSONObject();
+        try {
+            for (Map.Entry<String, ReplicationStatus> status : tableStatuses.entrySet()) {
+                tableStatus.put(status.getKey(), status.getValue().toJsonObject());
+            }
+            retObject.put(DB_STATUS, databaseStatus.toJsonObject());
+            retObject.put(TABLE_STATUS, tableStatus);
+            return retObject.toString(ReplicationStatus.INDENT_FACTOR);
+        } catch (JSONException e) {
+            throw new HiveReplicationException("Unable to serialize Database Replication Status", e);
+        }
+    }
+
+    // de-serialize
+    public DBReplicationStatus(String jsonString) throws HiveReplicationException {
+        try {
+            JSONObject object = new JSONObject(jsonString);
+            ReplicationStatus dbstatus = new ReplicationStatus(object.get(DB_STATUS).toString());
+            setDatabaseStatus(dbstatus);
+
+            JSONObject tableJson = object.getJSONObject(TABLE_STATUS);
+            Iterator keys = tableJson.keys();
+            while(keys.hasNext()) {
+                String key = keys.next().toString();
+                ReplicationStatus value = new ReplicationStatus(tableJson.get(key).toString());
+                if (value.getDatabase().equals(dbstatus.getDatabase())) {
+                    tableStatuses.put(key.toLowerCase(), value);
+                } else {
+                    throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString. "
+                            + "Cannot set status for table " + value.getDatabase() + "." + value.getTable()
+                            + ", It does not belong to DB " + dbstatus.getDatabase());
+                }
+            }
+        } catch (JSONException e) {
+            throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString", e);
+        }
+    }
+
+    public Map<String, ReplicationStatus> getTableStatuses() {
+        return tableStatuses;
+    }
+
+    public ReplicationStatus getTableStatus(String tableName) throws HiveReplicationException {
+        tableName = tableName.toLowerCase();
+        if (tableStatuses.containsKey(tableName)) {
+            return tableStatuses.get(tableName);
+        }
+        return new ReplicationStatus(databaseStatus.getSourceUri(), databaseStatus.getTargetUri(),
+                databaseStatus.getJobName(), databaseStatus.getDatabase(),
+                tableName, ReplicationStatus.Status.INIT, -1);
+    }
+
+    public Iterator<ReplicationStatus> getTableStatusIterator() {
+        List<ReplicationStatus> resultSet = new ArrayList<ReplicationStatus>();
+        for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+            resultSet.add(entry.getValue());
+        }
+        return resultSet.iterator();
+    }
+
+    private void setTableStatuses(Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException {
+        for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+            if (!entry.getValue().getDatabase().equals(databaseStatus.getDatabase())) {
+                throw new HiveReplicationException("Cannot set status for table " + entry.getValue().getDatabase()
+                        + "." + entry.getValue().getTable() + ", It does not belong to DB "
+                        + databaseStatus.getDatabase());
+            } else {
+                this.tableStatuses.put(entry.getKey().toLowerCase(), entry.getValue());
+            }
+        }
+    }
+
+    public ReplicationStatus getDatabaseStatus() {
+        return databaseStatus;
+    }
+
+    private void setDatabaseStatus(ReplicationStatus databaseStatus) {
+        this.databaseStatus = databaseStatus;
+    }
+
+    /**
+     * Update DB status from table statuses.
+            case 1) All tables replicated successfully.
+                Take the largest successful eventId and set dbReplStatus as success
+            case 2) One or many tables failed to replicate
+                Take the smallest eventId amongst the failed tables and set dbReplStatus as failed.
+     */
+    public void updateDbStatusFromTableStatuses() throws HiveReplicationException {
+        if (tableStatuses.size() == 0) {
+            // nothing to do
+            return;
+        }
+
+        databaseStatus.setStatus(ReplicationStatus.Status.SUCCESS);
+        long successEventId = databaseStatus.getEventId();
+        long failedEventId = -1;
+
+        for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+            long eventId = entry.getValue().getEventId();
+            if (entry.getValue().getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
+                if (eventId > successEventId) {
+                    successEventId = eventId;
+                }
+            } else if (entry.getValue().getStatus().equals(ReplicationStatus.Status.FAILURE)) {
+                databaseStatus.setStatus(ReplicationStatus.Status.FAILURE);
+                if (eventId < failedEventId || failedEventId == -1) {
+                    failedEventId = eventId;
+                }
+            } //else , if table status is Status.INIT, it should not change lastEventId of DB
+        }
+
+        String log = "Updating DB Status based on table replication status. Status : "
+                + databaseStatus.getStatus().toString() + ", eventId : ";
+        if (databaseStatus.getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
+            databaseStatus.setEventId(successEventId);
+            LOG.info(log + String.valueOf(successEventId));
+        } else if (databaseStatus.getStatus().equals(ReplicationStatus.Status.FAILURE)) {
+            databaseStatus.setEventId(failedEventId);
+            LOG.error(log + String.valueOf(failedEventId));
+        }
+
+    }
+
+    public void updateDbStatus(ReplicationStatus status) throws HiveReplicationException {
+        if (StringUtils.isNotEmpty(status.getTable())) {
+            throw new HiveReplicationException("Cannot update DB Status. This is table level status.");
+        }
+
+        if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
+            this.databaseStatus = status;
+        } else {
+            throw new HiveReplicationException("Cannot update Database Status. StatusDB "
+                    + status.getDatabase() + " does not match current DB "
+                    +  this.databaseStatus.getDatabase());
+        }
+    }
+
+    public void updateTableStatus(ReplicationStatus status) throws HiveReplicationException {
+        if (StringUtils.isEmpty(status.getTable())) {
+            throw new HiveReplicationException("Cannot update Table Status. Table name is empty.");
+        }
+
+        if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
+            this.tableStatuses.put(status.getTable(), status);
+        } else {
+            throw new HiveReplicationException("Cannot update Table Status. TableDB "
+                    + status.getDatabase() + " does not match current DB "
+                    +  this.databaseStatus.getDatabase());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
new file mode 100644
index 0000000..cf6b7ad
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract class for Data Replication Status Store.
+ */
+public abstract class DRStatusStore {
+
+    public static final String BASE_DEFAULT_STORE_PATH = "/apps/data-mirroring/";
+    public static final FsPermission DEFAULT_STORE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
+
+    private static String storeGroup = "users";
+
+
+    /**
+     * Update replication status of a table(s)/db after replication job jobName completes.
+     * @param jobName Name of the replication job.
+     * @param statusList List of replication statuses of db/tables replicated by jobName.
+     */
+    public abstract void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
+        throws HiveReplicationException;
+
+    /**
+     * Get Replication status for a database.
+     * @param source Replication source uri.
+     * @param target Replication target uri.
+     * @param jobName Name of the replication job.
+     * @param database Name of the target database.
+     * @return ReplicationStatus
+     * destination commands for each table
+     */
+    public abstract ReplicationStatus getReplicationStatus(String source, String target,
+                                                           String jobName, String database)
+        throws HiveReplicationException;
+
+    /**
+     * Get Replication status for a table.
+     * @param source Replication source uri.
+     * @param target Replication target uri.
+     * @param jobName Name of the replication job.
+     * @param database Name of the target database.
+     * @param table Name of the target table.
+     * @return ReplicationStatus
+     * destination commands for each table
+     */
+    public abstract ReplicationStatus getReplicationStatus(String source, String target,
+                                                           String jobName, String database,
+                                                           String table) throws HiveReplicationException;
+
+    /**
+     * Get Replication status of all tables in a database.
+     * @param source Replication source uri.
+     * @param target Replication target uri.
+     * @param jobName Name of the replication job.
+     * @param database Name of the target database.
+     * @return Iterator
+     * destination commands for each table
+     */
+    public abstract Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
+                                                                                String jobName, String database)
+        throws HiveReplicationException;
+
+
+    /**
+     * Delete a replication job.
+     * @param jobName Name of the replication job.
+     * @param database Name of the target database.
+     * destination commands for each table
+     */
+    public abstract void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException;
+
+    public static String getStoreGroup() {
+        return storeGroup;
+    }
+
+    public static void setStoreGroup(String group) {
+        storeGroup = group;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
new file mode 100644
index 0000000..3b3156f
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+/**
+ * Public delimiters used for event processing.
+ */
+public final class DelimiterUtils {
+    public static final String FIELD_DELIM = "\u0001";
+    public static final String NEWLINE_DELIM = System.getProperty("line.separator");
+    public static final String TAB_DELIM = "\t";
+
+    private DelimiterUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
new file mode 100644
index 0000000..fb695d0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.ReplicationEventMetadata;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Utility methods for event sourcer.
+ */
+public class EventSourcerUtils {
+
+    private static final String METAFILE_EXTENSION = ".meta";
+    private static final String SRCFILE_EXTENSION = ".src";
+    private static final String TGTFILE_EXTENSION = ".tgt";
+    private Path eventsInputDirPath;
+    private final boolean shouldKeepHistory;
+    private final FileSystem jobFS;
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventSourcerUtils.class);
+
+    public EventSourcerUtils(final Configuration conf, final boolean shouldKeepHistory,
+                             final String jobName) throws Exception {
+        this.shouldKeepHistory = shouldKeepHistory;
+        jobFS = FileSystem.get(conf);
+        init(jobName);
+    }
+
+    private void init(final String jobName) throws Exception {
+        // Create base dir to store events on cluster where job is running
+        Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
+        // Validate base path
+        FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
+
+        if (!jobFS.exists(dir)) {
+            if (!jobFS.mkdirs(dir)) {
+                throw new Exception("Creating directory failed: " + dir);
+            }
+        }
+
+        eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName);
+
+        if (!jobFS.exists(eventsInputDirPath)) {
+            if (!jobFS.mkdirs(eventsInputDirPath)) {
+                throw new Exception("Creating directory failed: " + eventsInputDirPath);
+            }
+        }
+    }
+
+    public OutputStream getFileOutputStream(final String path) throws Exception {
+        return FileSystem.create(jobFS, new Path(path), FileUtils.FS_PERMISSION_700);
+    }
+
+    public void closeOutputStream(OutputStream out) throws IOException {
+        if (out != null) {
+            try {
+                out.flush();
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+        }
+    }
+
+    public void persistReplicationEvents(final OutputStream out,
+                                         final java.lang.Iterable
+                                                 <? extends org.apache.hive.hcatalog.api.repl.Command> cmds)
+        throws Exception {
+        for (Command cmd : cmds) {
+            persistReplicationEvents(out, cmd);
+        }
+    }
+
+    public void persistReplicationEvents(final OutputStream out,
+                                         final Command cmd) throws Exception {
+        out.write(ReplicationUtils.serializeCommand(cmd).getBytes());
+        LOG.debug("HiveDR Serialized Repl Command : {}", cmd);
+        out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
+    }
+
+    public String persistToMetaFile(final ReplicationEventMetadata data, final String identifier) throws IOException {
+        if (data != null && data.getEventFileMetadata() != null && !data.getEventFileMetadata().isEmpty()) {
+            Path metaFilename = new Path(eventsInputDirPath.toString(), identifier + METAFILE_EXTENSION);
+            OutputStream out = null;
+
+            try {
+                out = FileSystem.create(jobFS, metaFilename, FileUtils.FS_PERMISSION_700);
+
+                for (Map.Entry<String, String> entry : data.getEventFileMetadata().entrySet()) {
+                    out.write(entry.getKey().getBytes());
+                    out.write(DelimiterUtils.FIELD_DELIM.getBytes());
+                    out.write(entry.getValue().getBytes());
+                    out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
+                }
+                out.flush();
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+            return jobFS.makeQualified(metaFilename).toString();
+        } else {
+            return null;
+        }
+    }
+
+    public static void updateEventMetadata(ReplicationEventMetadata data, final String dbName, final String tableName,
+                                           final String srcFilename, final String tgtFilename) {
+        if (data == null || data.getEventFileMetadata() == null) {
+            return;
+        }
+        StringBuilder key = new StringBuilder();
+
+        if (StringUtils.isNotEmpty(dbName)) {
+            key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes()));
+        }
+        key.append(DelimiterUtils.FIELD_DELIM);
+        if (StringUtils.isNotEmpty(tableName)) {
+            key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes()));
+        }
+
+        StringBuilder value = new StringBuilder();
+        if (StringUtils.isNotEmpty(srcFilename)) {
+            value.append(srcFilename);
+        }
+        value.append(DelimiterUtils.FIELD_DELIM);
+
+        if (StringUtils.isNotEmpty(tgtFilename)) {
+            value.append(tgtFilename);
+        }
+
+        data.getEventFileMetadata().put(key.toString(), value.toString());
+    }
+
+    public static void updateEventMetadata(ReplicationEventMetadata data, final ReplicationEventMetadata inputData) {
+        if (data == null || data.getEventFileMetadata() == null || inputData == null
+                || inputData.getEventFileMetadata() == null || inputData.getEventFileMetadata().isEmpty()) {
+            return;
+        }
+
+        data.getEventFileMetadata().putAll(inputData.getEventFileMetadata());
+    }
+
+    public Path getSrcFileName(final String identifier) {
+        return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + SRCFILE_EXTENSION));
+    }
+
+    public Path getTargetFileName(final String identifier) {
+        return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + TGTFILE_EXTENSION));
+    }
+
+    public void cleanUpEventInputDir() {
+        if (!shouldKeepHistory) {
+            try {
+                jobFS.delete(eventsInputDirPath, true);
+                eventsInputDirPath = null;
+            } catch (IOException e) {
+                LOG.error("Unable to cleanup: {}", eventsInputDirPath, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
new file mode 100644
index 0000000..0b4200c
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Utility class to handle Hive events for data-mirroring.
+ */
+public class EventUtils {
+    private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
+    private static final int TIMEOUT_IN_SECS = 300;
+    private static final String JDBC_PREFIX = "jdbc:";
+    private static final int RETRY_ATTEMPTS = 3;
+
+    private Configuration conf = null;
+    private String sourceHiveServer2Uri = null;
+    private String sourceDatabase = null;
+    private String sourceNN = null;
+    private String sourceNNKerberosPrincipal = null;
+    private String targetHiveServer2Uri = null;
+    private String targetStagingPath = null;
+    private String targetNN = null;
+    private String targetNNKerberosPrincipal = null;
+    private String targetStagingUri = null;
+    private List<Path> sourceCleanUpList = null;
+    private List<Path> targetCleanUpList = null;
+    private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
+
+    private FileSystem sourceFileSystem = null;
+    private FileSystem targetFileSystem = null;
+    private Connection sourceConnection = null;
+    private Connection targetConnection = null;
+    private Statement sourceStatement = null;
+    private Statement targetStatement = null;
+
+    private List<ReplicationStatus> listReplicationStatus;
+
+    public EventUtils(Configuration conf) {
+        this.conf = conf;
+        sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName());
+        sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
+        sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
+        sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
+        targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
+        targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
+                + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
+        targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
+        targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
+        sourceCleanUpList = new ArrayList<Path>();
+        targetCleanUpList = new ArrayList<Path>();
+    }
+
+    public void setupConnection() throws Exception {
+        Class.forName(DRIVER_NAME);
+        DriverManager.setLoginTimeout(TIMEOUT_IN_SECS);
+        String authTokenString = ";auth=delegationToken";
+        //To bypass findbugs check, need to store empty password in Properties.
+        Properties password = new Properties();
+        password.put("password", "");
+        String user = "";
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        if (currentUser != null) {
+            user = currentUser.getShortUserName();
+        }
+
+        if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
+            String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase;
+            if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
+                connString += authTokenString;
+            }
+            sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
+            sourceStatement = sourceConnection.createStatement();
+        } else {
+            String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase;
+            if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
+                connString += authTokenString;
+            }
+            targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
+            targetStatement = targetConnection.createStatement();
+        }
+    }
+
+    public void initializeFS() throws IOException {
+        LOG.info("Initializing staging directory");
+        targetStagingUri = new Path(targetNN, targetStagingPath).toString();
+        sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal));
+        targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal));
+    }
+
+    private String readEvents(Path eventFileName) throws IOException {
+        StringBuilder eventString = new StringBuilder();
+        BufferedReader in = new BufferedReader(new InputStreamReader(sourceFileSystem.open(eventFileName)));
+        try {
+            String line;
+            while ((line=in.readLine())!=null) {
+                eventString.append(line);
+                eventString.append(DelimiterUtils.NEWLINE_DELIM);
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+
+        return eventString.toString();
+    }
+
+    public void processEvents(String event) throws Exception {
+        listReplicationStatus = new ArrayList<ReplicationStatus>();
+        String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM);
+        String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8");
+        String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8");
+        String exportEventStr;
+        String importEventStr;
+        if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
+            exportEventStr = readEvents(new Path(eventSplit[2]));
+            if (StringUtils.isNotEmpty(exportEventStr)) {
+                LOG.info("Process the export statements for db {} table {}", dbName, tableName);
+                processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false);
+                if (!sourceCleanUpList.isEmpty()) {
+                    invokeCopy(sourceCleanUpList);
+                }
+            }
+        } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
+            importEventStr = readEvents(new Path(eventSplit[3]));
+            if (StringUtils.isNotEmpty(importEventStr)) {
+                LOG.info("Process the import statements for db {} table {}", dbName, tableName);
+                processCommands(importEventStr, dbName, tableName, targetStatement, targetCleanUpList, true);
+            }
+        }
+    }
+
+    public List<ReplicationStatus> getListReplicationStatus() {
+        return listReplicationStatus;
+    }
+
+    private void processCommands(String eventStr, String dbName, String tableName, Statement sqlStmt,
+                                 List<Path> cleanUpList, boolean isImportStatements)
+        throws SQLException, HiveReplicationException, IOException {
+        String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM);
+        List<Command> deserializeCommand = new ArrayList<Command>();
+        for (String command : commandList) {
+            Command cmd = ReplicationUtils.deserializeCommand(command);
+            deserializeCommand.add(cmd);
+            List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent();
+            cleanUpList.addAll(getCleanUpPaths(cleanupLocations));
+        }
+        for (Command cmd : deserializeCommand) {
+            try {
+                LOG.debug("Executing command : {} : {} ", cmd.getEventId(), cmd.toString());
+                executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, 0);
+            } catch (Exception e) {
+                // clean up locations before failing.
+                cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
+                cleanupEventLocations(targetCleanUpList, targetFileSystem);
+                throw new HiveReplicationException("Could not process replication command for "
+                        + " DB Name:" + dbName + ", Table Name:" + tableName, e);
+            }
+        }
+    }
+
+    private void executeCommand(Command cmd, String dbName, String tableName,
+                                Statement sqlStmt, boolean isImportStatements, int attempt)
+        throws HiveReplicationException, SQLException, IOException {
+        for (final String stmt : cmd.get()) {
+            executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, isImportStatements, attempt);
+        }
+        if (isImportStatements) {
+            addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, tableName, cmd.getEventId());
+        }
+    }
+
+    private void executeSqlStatement(Command cmd, String dbName, String tableName,
+                                     Statement sqlStmt, String stmt, boolean isImportStatements, int attempt)
+        throws HiveReplicationException, SQLException, IOException {
+        try {
+            sqlStmt.execute(stmt);
+        } catch (SQLException sqeOuter) {
+            // Retry if command is retriable.
+            if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) {
+                if (isImportStatements) {
+                    try {
+                        cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), targetFileSystem);
+                    } catch (IOException ioe) {
+                        // Clean up failed before retry on target. Update failure status and return
+                        addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
+                                tableName, cmd.getEventId());
+                        throw ioe;
+                    }
+                } else {
+                    cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), sourceFileSystem);
+                }
+                executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, ++attempt);
+                return; // Retry succeeded, return without throwing an exception.
+            }
+            // If we reached here, retries have failed.
+            LOG.error("SQL Exception: {}", sqeOuter);
+            undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements);
+            if (isImportStatements) {
+                addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, tableName, cmd.getEventId());
+            }
+            throw sqeOuter;
+        }
+    }
+
+    private static List<Path> getCleanUpPaths(List<String> cleanupLocations) {
+        List<Path> cleanupLocationPaths = new ArrayList<Path>();
+        for (String cleanupLocation : cleanupLocations) {
+            cleanupLocationPaths.add(new Path(cleanupLocation));
+        }
+        return cleanupLocationPaths;
+    }
+
+    private void undoCommand(Command cmd, String dbName,
+                             String tableName, Statement sqlStmt, boolean isImportStatements)
+        throws SQLException, HiveReplicationException {
+        if (cmd.isUndoable()) {
+            try {
+                List<String> undoCommands = cmd.getUndo();
+                LOG.debug("Undo command: {}", StringUtils.join(undoCommands.toArray()));
+                if (undoCommands.size() != 0) {
+                    for (final String undoStmt : undoCommands) {
+                        sqlStmt.execute(undoStmt);
+                    }
+                }
+            } catch (SQLException sqeInner) {
+                if (isImportStatements) {
+                    addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
+                            tableName, cmd.getEventId());
+                }
+                LOG.error("SQL Exception: {}", sqeInner);
+                throw sqeInner;
+            }
+        }
+    }
+
+    private void addReplicationStatus(ReplicationStatus.Status status, String dbName, String tableName, long eventId)
+        throws HiveReplicationException {
+        try {
+            String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName());
+            ReplicationStatus rs = new ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()),
+                    conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, dbName, tableName, status, eventId);
+            listReplicationStatus.add(rs);
+        } catch (HiveReplicationException hre) {
+            throw new HiveReplicationException("Could not update replication status store for "
+                    + " EventId:" + eventId
+                    + " DB Name:" + dbName
+                    + " Table Name:" + tableName
+                    + hre.toString());
+        }
+    }
+
+    public void invokeCopy(List<Path> srcStagingPaths) throws Exception {
+        DistCpOptions options = getDistCpOptions(srcStagingPaths);
+        DistCp distCp = new DistCp(conf, options);
+        LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()),
+                targetStagingUri);
+        Job distcpJob = distCp.execute();
+        LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
+        LOG.info("Completed DistCp");
+    }
+
+    public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
+        srcStagingPaths.toArray(new Path[srcStagingPaths.size()]);
+
+        DistCpOptions distcpOptions = new DistCpOptions(srcStagingPaths, new Path(targetStagingUri));
+        /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be
+        copied to the same staging sir at target resulting in DuplicateFileException in DistCp.
+        */
+
+        distcpOptions.setSyncFolder(false);
+        distcpOptions.setBlocking(true);
+        distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
+        distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
+        return distcpOptions;
+    }
+
+    public void cleanEventsDirectory() throws IOException {
+        LOG.info("Cleaning staging directory");
+        cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
+        cleanupEventLocations(targetCleanUpList, targetFileSystem);
+    }
+
+    private void cleanupEventLocations(List<Path> cleanupList, FileSystem fileSystem)
+        throws IOException {
+        for (Path cleanUpPath : cleanupList) {
+            try {
+                fileSystem.delete(cleanUpPath, true);
+            } catch (IOException ioe) {
+                LOG.error("Cleaning up of staging directory {} failed {}", cleanUpPath, ioe.toString());
+                throw ioe;
+            }
+        }
+
+    }
+
+    public void closeConnection() throws SQLException {
+        if (sourceStatement != null) {
+            sourceStatement.close();
+        }
+
+        if (targetStatement != null) {
+            targetStatement.close();
+        }
+
+        if (sourceConnection != null) {
+            sourceConnection.close();
+        }
+        if (targetConnection != null) {
+            targetConnection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
new file mode 100644
index 0000000..6bd6319
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Utility class to validate HDFS files.
+ */
+public final class FileUtils {
+
+    public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH
+            + File.separator + "Events";
+    public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+
+    private FileUtils() {}
+
+    public static Configuration getConfiguration(final String writeEP, final String nnKerberosPrincipal) {
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", writeEP);
+        if (StringUtils.isNotEmpty(nnKerberosPrincipal)) {
+            conf.set("dfs.namenode.kerberos.principal", nnKerberosPrincipal);
+        }
+        return conf;
+    }
+
+    public static void validatePath(final FileSystem fileSystem, final Path basePath) throws IOException {
+        if (!fileSystem.exists(basePath)) {
+            throw new IOException("Please create base dir " + fileSystem.getUri() + basePath
+                    + ". Please set group to " + DRStatusStore.getStoreGroup()
+                    + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
+        }
+
+        if (!fileSystem.getFileStatus(basePath).getPermission().equals(DRStatusStore.DEFAULT_STORE_PERMISSION)
+                || !fileSystem.getFileStatus(basePath).getGroup().equalsIgnoreCase(DRStatusStore.getStoreGroup())) {
+            throw new IOException("Base dir " + fileSystem.getUri() + basePath
+                    + " does not have correct ownership/permissions."
+                    + " Please set group to " + DRStatusStore.getStoreGroup()
+                    + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
new file mode 100644
index 0000000..900afe8
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DRStatusStore implementation for hive.
+ */
+public class HiveDRStatusStore extends DRStatusStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class);
+    private FileSystem fileSystem;
+
+    private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + "hiveReplicationStatusStore/";
+    private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
+
+    private static final String LATEST_FILE = "latest.json";
+    private static final int FILE_ROTATION_LIMIT = 10;
+    private static final int FILE_ROTATION_TIME = 86400000; // 1 day
+
+
+    public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException {
+        init(targetFileSystem);
+    }
+
+    public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException {
+        HiveDRStatusStore.setStoreGroup(group);
+        init(targetFileSystem);
+    }
+
+    private void init(FileSystem targetFileSystem) throws IOException {
+        this.fileSystem = targetFileSystem;
+        Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
+        FileUtils.validatePath(fileSystem, basePath);
+
+        Path storePath = new Path(DEFAULT_STORE_PATH);
+        if (!fileSystem.exists(storePath)) {
+            if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
+                throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH);
+            }
+        } else {
+            if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) {
+                throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. "
+                        + "Please set to 777");
+            }
+        }
+    }
+
+     /**
+        get all DB updated by the job. get all current table statuses for the DB merge the latest repl
+        status with prev table repl statuses. If all statuses are success, store the status as success
+        with largest eventId for the DB else store status as failure for the DB and lowest eventId.
+     */
+    @Override
+    public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
+        throws HiveReplicationException {
+        Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>();
+        for (ReplicationStatus status : statusList) {
+            if (!status.getJobName().equals(jobName)) {
+                String error = "JobName for status does not match current job \"" + jobName
+                        + "\". Status is " + status.toJsonString();
+                LOG.error(error);
+                throw new HiveReplicationException(error);
+            }
+
+            // init dbStatusMap and tableStatusMap from existing statuses.
+            if (!dbStatusMap.containsKey(status.getDatabase())) {
+                DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(),
+                        status.getJobName(), status.getDatabase());
+                dbStatusMap.put(status.getDatabase(), dbStatus);
+            }
+
+            // update existing statuses with new status for db/tables
+            if (StringUtils.isEmpty(status.getTable())) { // db level replication status.
+                dbStatusMap.get(status.getDatabase()).updateDbStatus(status);
+            } else { // table level replication status
+                dbStatusMap.get(status.getDatabase()).updateTableStatus(status);
+            }
+        }
+        // write to disk
+        for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) {
+            writeStatusFile(entry.getValue());
+        }
+    }
+
+    @Override
+    public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database)
+        throws HiveReplicationException {
+        return getReplicationStatus(source, target, jobName, database, null);
+    }
+
+
+    public ReplicationStatus getReplicationStatus(String source, String target,
+                                                  String jobName, String database,
+                                                  String table) throws HiveReplicationException {
+        if (StringUtils.isEmpty(table)) {
+            return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus();
+        } else {
+            return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table);
+        }
+    }
+
+    @Override
+    public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
+                                                                       String jobName, String database)
+        throws HiveReplicationException {
+        DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database);
+        return dbReplicationStatus.getTableStatusIterator();
+    }
+
+    @Override
+    public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException {
+        Path deletePath = getStatusDirPath(database, jobName);
+        try {
+            if (fileSystem.exists(deletePath)) {
+                fileSystem.delete(deletePath, true);
+            }
+        } catch (IOException e) {
+            throw new HiveReplicationException("Failed to delete status for Job "
+                    + jobName + " and DB "+ database, e);
+        }
+
+    }
+
+    private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
+                                                       String database) throws HiveReplicationException{
+        DBReplicationStatus dbReplicationStatus = null;
+        Path statusDirPath = getStatusDirPath(database, jobName);
+        // check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
+        // if yes, use md5 of the same for dir names. prefer to use actual db names for readability.
+
+        try {
+            if (fileSystem.exists(statusDirPath)) {
+                dbReplicationStatus = readStatusFile(statusDirPath);
+            }
+            if (null == dbReplicationStatus) {
+                // Init replication state for this database
+                ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
+                        database, null, ReplicationStatus.Status.INIT, -1);
+                dbReplicationStatus = new DBReplicationStatus(initDbStatus);
+                if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
+                    String error = "mkdir failed for " + statusDirPath.toString();
+                    LOG.error(error);
+                    throw new HiveReplicationException(error);
+                }
+                writeStatusFile(dbReplicationStatus);
+            }
+            return dbReplicationStatus;
+        } catch (IOException e) {
+            String error = "Failed to get ReplicationStatus for job " + jobName;
+            LOG.error(error);
+            throw new HiveReplicationException(error);
+        }
+    }
+
+    private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
+        ReplicationStatus status = dbReplicationStatus.getDatabaseStatus();
+        return getStatusDirPath(status.getDatabase(), status.getJobName());
+    }
+
+    public Path getStatusDirPath(String database, String jobName) {
+        return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName);
+    }
+
+    private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
+        dbReplicationStatus.updateDbStatusFromTableStatuses();
+        String statusDir = getStatusDirPath(dbReplicationStatus).toString();
+        try {
+            Path latestFile = new Path(statusDir + "/" + LATEST_FILE);
+            if (fileSystem.exists(latestFile)) {
+                Path renamedFile = new Path(statusDir + "/"
+                        + String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json");
+                fileSystem.rename(latestFile, renamedFile);
+            }
+
+            FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION);
+            stream.write(dbReplicationStatus.toJsonString().getBytes());
+            stream.close();
+
+        } catch (IOException e) {
+            String error = "Failed to write latest Replication status into dir " + statusDir;
+            LOG.error(error);
+            throw new HiveReplicationException(error);
+        }
+
+        rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME);
+    }
+
+    public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException {
+
+        List<String> fileList = new ArrayList<String>();
+        long now = System.currentTimeMillis();
+        try {
+            RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false);
+            while (fileIterator.hasNext()) {
+                fileList.add(fileIterator.next().getPath().toString());
+            }
+            if (fileList.size() > (numFiles+1)) {
+                // delete some files, as long as they are older than the time.
+                Collections.sort(fileList);
+                for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) {
+                    long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime();
+                    if ((now - modTime) > maxFileAge) {
+                        Path deleteFilePath = new Path(file);
+                        if (fileSystem.exists(deleteFilePath)) {
+                            fileSystem.delete(deleteFilePath, false);
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            String error = "Failed to rotate status files in dir " + statusDir.toString();
+            LOG.error(error);
+            throw new HiveReplicationException(error);
+        }
+    }
+
+    private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException {
+        try {
+            Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE);
+            if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) {
+                return null;
+            } else {
+                return new DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile)));
+            }
+        } catch (IOException e) {
+            String error = "Failed to read latest Replication status from dir " + statusDirPath.toString();
+            LOG.error(error);
+            throw new HiveReplicationException(error);
+        }
+    }
+
+    public void checkForReplicationConflict(String newSource, String jobName,
+                                             String database, String table) throws HiveReplicationException {
+        try {
+            Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json");
+            FileStatus[] files = fileSystem.globStatus(globPath);
+            for(FileStatus file : files) {
+                DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(
+                        fileSystem.open(file.getPath())));
+                ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus();
+
+                if (!(newSource.equals(existingJob.getSourceUri()))) {
+                    throw new HiveReplicationException("Two different sources are attempting to replicate to same db "
+                            + database + ". New Source = " + newSource
+                            + ", Existing Source = " + existingJob.getSourceUri());
+                } // two different sources replicating to same DB. Conflict
+                if (jobName.equals(existingJob.getJobName())) {
+                    continue;
+                } // same job, no conflict.
+
+                if (StringUtils.isEmpty(table)) {
+                    // When it is DB level replication, two different jobs cannot replicate to same DB
+                    throw new HiveReplicationException("Two different jobs are attempting to replicate to same db "
+                            + database.toLowerCase() + ". New Job = " + jobName
+                            + ", Existing Job = " + existingJob.getJobName());
+                }
+
+                /*
+                At this point, it is different table level jobs replicating from same newSource to same target. This is
+                allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and
+                job2 can replicate db1.table2.  Both jobs cannot replicate to same table.
+                 */
+                for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) {
+                    if (table.equals(entry.getKey())) {
+                        throw new HiveReplicationException("Two different jobs are trying to replicate to same table "
+                                + entry.getKey() + ". New job = " + jobName
+                                + ", Existing job = " + existingJob.getJobName());
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new HiveReplicationException("Failed to read status files for DB "
+                    + database, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
new file mode 100644
index 0000000..d9d6ab0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Hive replication utility class.
+ */
+public final class HiveDRUtils {
+    /**
+     * Enum for Hive replication type.
+     */
+    public enum ReplicationType {
+        TABLE,
+        DB
+    }
+
+    /**
+     * Enum for hive-dr action type.
+     */
+    public enum ExecutionStage {
+        IMPORT,
+        EXPORT,
+        LASTEVENTS
+    }
+
+    private static final String ALL_TABLES = "*";
+
+    public static final String SEPARATOR = File.separator;
+
+    private HiveDRUtils() {}
+
+    public static ReplicationType getReplicationType(List<String> sourceTables) {
+        return (sourceTables.size() == 1 && sourceTables.get(0).equals(ALL_TABLES)) ? ReplicationType.DB
+                : ReplicationType.TABLE;
+    }
+
+    public static Configuration getDefaultConf() throws IOException {
+        Configuration conf = new Configuration();
+        conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+        String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
+        if (delegationToken != null) {
+            conf.set("mapreduce.job.credentials.binary", delegationToken);
+            conf.set("tez.credentials.path", delegationToken);
+        }
+        return conf;
+    }
+
+    public static String getFilePathFromEnv(String env) {
+        String path = System.getenv(env);
+        if (path != null && Shell.WINDOWS) {
+            // In Windows, file paths are enclosed in \" so remove them here
+            // to avoid path errors
+            if (path.charAt(0) == '"') {
+                path = path.substring(1);
+            }
+            if (path.charAt(path.length() - 1) == '"') {
+                path = path.substring(0, path.length() - 1);
+            }
+        }
+        return path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
new file mode 100644
index 0000000..ea19f09
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Create hive metastore client for user.
+ */
+public final class HiveMetastoreUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreUtils.class);
+
+    private HiveMetastoreUtils() {}
+
+    public static HCatClient initializeHiveMetaStoreClient(String metastoreUri, String metastorePrincipal,
+                                                    String hive2Principal) throws Exception {
+        try {
+            HiveConf hcatConf = createHiveConf(HiveDRUtils.getDefaultConf(),
+                    metastoreUri, metastorePrincipal, hive2Principal);
+            HCatClient client = HCatClient.create(hcatConf);
+            return client;
+        } catch (IOException e) {
+            throw new Exception("Exception creating HCatClient: " + e.getMessage(), e);
+        }
+    }
+
+    private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal,
+                                           String hive2Principal) throws IOException {
+        JobConf jobConf = new JobConf(conf);
+        String delegationToken = HiveDRUtils.getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
+        if (delegationToken != null) {
+            Credentials credentials = Credentials.readTokenStorageFile(new File(delegationToken), conf);
+            jobConf.setCredentials(credentials);
+            UserGroupInformation.getCurrentUser().addCredentials(credentials);
+        }
+
+        HiveConf hcatConf = new HiveConf(jobConf, HiveConf.class);
+
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname, EximReplicationTaskFactory.class.getName());
+        if (StringUtils.isNotEmpty(metastorePrincipal)) {
+            hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal);
+            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+            hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true");
+            hcatConf.set("hadoop.rpc.protection", "authentication");
+        }
+        if (StringUtils.isNotEmpty(hive2Principal)) {
+            hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal);
+            hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
+        }
+
+        return hcatConf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
new file mode 100644
index 0000000..bb33772
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Object to store replication status of a DB or a table.
+ */
+public class ReplicationStatus {
+
+    public static final int INDENT_FACTOR = 4;
+    private static final String SOURCE = "sourceUri";
+    private static final String TARGET = "targetUri";
+    private static final String JOB_NAME = "jobName";
+    private static final String DATABASE = "database";
+    private static final String TABLE = "table";
+    private static final String EVENT_ID = "eventId";
+    private static final String STATUS_KEY = "status";
+    private static final String STATUS_LOG = "statusLog";
+
+    /**
+     * Replication Status enum.
+     */
+    public static enum Status {
+        INIT,
+        SUCCESS,
+        FAILURE
+    }
+
+    private String sourceUri;
+    private String targetUri;
+    private String jobName;
+    private String database;
+    private String table;
+    private Status status = Status.SUCCESS;
+    private long eventId = -1;
+    private String log;
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public ReplicationStatus(String sourceUri, String targetUri, String jobName,
+                             String database, String table,
+                             ReplicationStatus.Status status, long eventId) throws HiveReplicationException {
+        init(sourceUri, targetUri, jobName, database, table, status, eventId, null);
+    }
+
+    private void init(String source, String target, String job,
+                      String dbName, String tableName, ReplicationStatus.Status replStatus,
+                      long eventNum, String logStr) throws HiveReplicationException {
+        setSourceUri(source);
+        setTargetUri(target);
+        setJobName(job);
+        setDatabase(dbName);
+        setTable(tableName);
+        setStatus(replStatus);
+        setEventId(eventNum);
+        setLog(logStr);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    public ReplicationStatus(String jsonString) throws HiveReplicationException {
+        try {
+            JSONObject object = new JSONObject(jsonString);
+            Status objectStatus;
+            try {
+                objectStatus = ReplicationStatus.Status.valueOf(object.getString(STATUS_KEY).toUpperCase());
+            } catch (IllegalArgumentException e1) {
+                throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus."
+                        + " Invalid status " + object.getString(STATUS_KEY), e1);
+            }
+
+            init(object.getString(SOURCE), object.getString(TARGET), object.getString(JOB_NAME),
+                    object.getString(DATABASE), object.has(TABLE) ? object.getString(TABLE) : null,
+                    objectStatus, object.has(EVENT_ID) ? object.getLong(EVENT_ID) : -1,
+                    object.has(STATUS_LOG) ? object.getString(STATUS_LOG) : null);
+        } catch (JSONException e) {
+            throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus ", e);
+        }
+
+    }
+
+    public String toJsonString() throws HiveReplicationException {
+        try {
+            return toJsonObject().toString(INDENT_FACTOR);
+        } catch (JSONException e) {
+            throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e);
+        }
+    }
+
+    public JSONObject toJsonObject() throws HiveReplicationException {
+        JSONObject jsonObject = new JSONObject();
+        try {
+            jsonObject.put(SOURCE, this.sourceUri);
+            jsonObject.put(TARGET, this.targetUri);
+            jsonObject.put(JOB_NAME, this.jobName);
+            jsonObject.put(DATABASE, this.database);
+            if (StringUtils.isNotEmpty(this.table)) {
+                jsonObject.put(TABLE, this.table);
+            }
+            jsonObject.put(STATUS_KEY, this.status.name());
+            if (this.eventId > -1) {
+                jsonObject.put(EVENT_ID, this.eventId);
+            } else {
+                jsonObject.put(EVENT_ID, -1);
+            }
+            if (StringUtils.isNotEmpty(this.log)) {
+                jsonObject.put(STATUS_LOG, this.log);
+            }
+            return jsonObject;
+        } catch (JSONException e) {
+            throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e);
+        }
+    }
+
+    public String getSourceUri() {
+        return this.sourceUri;
+    }
+
+    public void setSourceUri(String source) throws HiveReplicationException {
+        validateString(SOURCE, source);
+        this.sourceUri = source;
+    }
+
+    public String getTargetUri() {
+        return this.targetUri;
+    }
+
+    public void setTargetUri(String target) throws HiveReplicationException {
+        validateString(TARGET, target);
+        this.targetUri = target;
+    }
+
+    public String getJobName() {
+        return this.jobName;
+    }
+
+    public void setJobName(String jobName) throws HiveReplicationException {
+        validateString(JOB_NAME, jobName);
+        this.jobName = jobName;
+    }
+
+    public String getDatabase() {
+        return this.database;
+    }
+
+    public void setDatabase(String database) throws HiveReplicationException {
+        validateString(DATABASE, database);
+        this.database = database.toLowerCase();
+    }
+
+    public String getTable() {
+        return this.table;
+    }
+
+    public void setTable(String table) {
+        this.table = (table == null) ? null : table.toLowerCase();
+    }
+
+    public Status getStatus() {
+        return this.status;
+    }
+
+    public void setStatus(Status status) throws HiveReplicationException {
+        if (status != null) {
+            this.status = status;
+        } else {
+            throw new HiveReplicationException("Failed to set ReplicationStatus. Input \""
+                    + STATUS_KEY + "\" cannot be empty");
+        }
+    }
+
+    public long getEventId() {
+        return this.eventId;
+    }
+
+    public void setEventId(long eventId) throws HiveReplicationException {
+        if (eventId > -1) {
+            this.eventId = eventId;
+        }
+    }
+
+    public String getLog() {
+        return this.log;
+    }
+
+    public void setLog(String log) {
+        this.log = log;
+    }
+
+    private void validateString(String inputName, String input) throws HiveReplicationException {
+        if (StringUtils.isEmpty(input)) {
+            throw new HiveReplicationException("Failed to set ReplicationStatus. Input \""
+                    + inputName + "\" cannot be empty");
+        }
+    }
+
+    public String toString() {
+        return sourceUri + "\t" + targetUri + "\t" + jobName + "\t"
+                + database + "\t"+ table + "\t" + status + "\t"+ eventId;
+    }
+
+}