You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/11 10:20:04 UTC
[4/5] falcon git commit: FALCON-1188 Falcon support for Hive
Replication. Contributed by Venkat Ranganathan.
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
new file mode 100644
index 0000000..98449f0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Copy committer class.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CopyCommitter.class);
+
+ /**
+ * Create a file output committer.
+ *
+ * @param outputPath the job's output path, or null if you want the output
+ * committer to act as a noop.
+ * @param context the task's context
+ * @throws java.io.IOException
+ */
+ public CopyCommitter(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+
+ try {
+ super.commitJob(jobContext);
+ } finally {
+ cleanup(conf);
+ }
+ }
+
+ private void cleanup(Configuration conf) {
+ // clean up staging and other data
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
new file mode 100644
index 0000000..5eb8acb
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.util.EventUtils;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Map class for Hive DR.
+ */
+public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
+ private EventUtils eventUtils;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ eventUtils = new EventUtils(context.getConfiguration());
+ eventUtils.initializeFS();
+ try {
+ eventUtils.setupConnection();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value,
+ Context context) throws IOException, InterruptedException {
+ LOG.debug("Processing Event value: {}", value.toString());
+
+ try {
+ eventUtils.processEvents(value.toString());
+ } catch (Exception e) {
+ LOG.error("Exception in processing events:", e);
+ throw new IOException(e);
+ } finally {
+ cleanup(context);
+ }
+ List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();
+ if (replicationStatusList != null && !replicationStatusList.isEmpty()) {
+ for (ReplicationStatus rs : replicationStatusList) {
+ context.write(new Text(rs.getJobName()), new Text(rs.toString()));
+ }
+ }
+ }
+
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ LOG.info("Invoking cleanup process");
+ super.cleanup(context);
+ try {
+ if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
+ .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
+ eventUtils.cleanEventsDirectory();
+ }
+ } catch (IOException e) {
+ LOG.error("Cleaning up of events directories failed", e);
+ } finally {
+ try {
+ eventUtils.closeConnection();
+ } catch (SQLException e) {
+ LOG.error("Closing the connections failed", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
new file mode 100644
index 0000000..50cb4b2
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.mapreduce;
+
+
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.FileUtils;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Reducer class for Hive DR.
+ */
+public class CopyReducer extends Reducer<Text, Text, Text, Text> {
+ private DRStatusStore hiveDRStore;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ FileSystem fs= FileSystem.get(FileUtils.getConfiguration(
+ conf.get(HiveDRArgs.TARGET_NN.getName()),
+ conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
+ hiveDRStore = new HiveDRStatusStore(fs);
+ }
+
+ private List<ReplicationStatus> sortStatusList(List<ReplicationStatus> replStatusList) {
+ Collections.sort(replStatusList, new Comparator<ReplicationStatus>() {
+ @Override
+ public int compare(ReplicationStatus r1, ReplicationStatus r2) {
+ return (int) (r1.getEventId() - r2.getEventId());
+ }
+ });
+ return replStatusList;
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
+ ReplicationStatus rs;
+ try {
+ for (Text value : values) {
+ String[] fields = (value.toString()).split("\t");
+ rs = new ReplicationStatus(fields[0], fields[1], fields[2], fields[3], fields[4],
+ ReplicationStatus.Status.valueOf(fields[5]), Long.parseLong(fields[6]));
+ replStatusList.add(rs);
+ }
+
+ hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList));
+ } catch (HiveReplicationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
new file mode 100644
index 0000000..6dceb8e
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to store replication status of a DB and it's tables.
+ */
+public class DBReplicationStatus {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DBReplicationStatus.class);
+ private static final String DB_STATUS = "db_status";
+ private static final String TABLE_STATUS = "table_status";
+
+ private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>();
+ private ReplicationStatus databaseStatus;
+
+ public DBReplicationStatus(ReplicationStatus dbStatus) throws HiveReplicationException {
+ setDatabaseStatus(dbStatus);
+ }
+
+ public DBReplicationStatus(ReplicationStatus dbStatus,
+ Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException {
+ /*
+ The order of set method calls is important to ensure tables that do not belong to same db
+ are not added to this DBReplicationStatus
+ */
+ setDatabaseStatus(dbStatus);
+ setTableStatuses(tableStatuses);
+ }
+
+ // Serialize
+ public String toJsonString() throws HiveReplicationException {
+ JSONObject retObject = new JSONObject();
+ JSONObject tableStatus = new JSONObject();
+ try {
+ for (Map.Entry<String, ReplicationStatus> status : tableStatuses.entrySet()) {
+ tableStatus.put(status.getKey(), status.getValue().toJsonObject());
+ }
+ retObject.put(DB_STATUS, databaseStatus.toJsonObject());
+ retObject.put(TABLE_STATUS, tableStatus);
+ return retObject.toString(ReplicationStatus.INDENT_FACTOR);
+ } catch (JSONException e) {
+ throw new HiveReplicationException("Unable to serialize Database Replication Status", e);
+ }
+ }
+
+ // de-serialize
+ public DBReplicationStatus(String jsonString) throws HiveReplicationException {
+ try {
+ JSONObject object = new JSONObject(jsonString);
+ ReplicationStatus dbstatus = new ReplicationStatus(object.get(DB_STATUS).toString());
+ setDatabaseStatus(dbstatus);
+
+ JSONObject tableJson = object.getJSONObject(TABLE_STATUS);
+ Iterator keys = tableJson.keys();
+ while(keys.hasNext()) {
+ String key = keys.next().toString();
+ ReplicationStatus value = new ReplicationStatus(tableJson.get(key).toString());
+ if (value.getDatabase().equals(dbstatus.getDatabase())) {
+ tableStatuses.put(key.toLowerCase(), value);
+ } else {
+ throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString. "
+ + "Cannot set status for table " + value.getDatabase() + "." + value.getTable()
+ + ", It does not belong to DB " + dbstatus.getDatabase());
+ }
+ }
+ } catch (JSONException e) {
+ throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString", e);
+ }
+ }
+
+ public Map<String, ReplicationStatus> getTableStatuses() {
+ return tableStatuses;
+ }
+
+ public ReplicationStatus getTableStatus(String tableName) throws HiveReplicationException {
+ tableName = tableName.toLowerCase();
+ if (tableStatuses.containsKey(tableName)) {
+ return tableStatuses.get(tableName);
+ }
+ return new ReplicationStatus(databaseStatus.getSourceUri(), databaseStatus.getTargetUri(),
+ databaseStatus.getJobName(), databaseStatus.getDatabase(),
+ tableName, ReplicationStatus.Status.INIT, -1);
+ }
+
+ public Iterator<ReplicationStatus> getTableStatusIterator() {
+ List<ReplicationStatus> resultSet = new ArrayList<ReplicationStatus>();
+ for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+ resultSet.add(entry.getValue());
+ }
+ return resultSet.iterator();
+ }
+
+ private void setTableStatuses(Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException {
+ for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+ if (!entry.getValue().getDatabase().equals(databaseStatus.getDatabase())) {
+ throw new HiveReplicationException("Cannot set status for table " + entry.getValue().getDatabase()
+ + "." + entry.getValue().getTable() + ", It does not belong to DB "
+ + databaseStatus.getDatabase());
+ } else {
+ this.tableStatuses.put(entry.getKey().toLowerCase(), entry.getValue());
+ }
+ }
+ }
+
+ public ReplicationStatus getDatabaseStatus() {
+ return databaseStatus;
+ }
+
+ private void setDatabaseStatus(ReplicationStatus databaseStatus) {
+ this.databaseStatus = databaseStatus;
+ }
+
+ /**
+ * Update DB status from table statuses.
+ case 1) All tables replicated successfully.
+ Take the largest successful eventId and set dbReplStatus as success
+ case 2) One or many tables failed to replicate
+ Take the smallest eventId amongst the failed tables and set dbReplStatus as failed.
+ */
+ public void updateDbStatusFromTableStatuses() throws HiveReplicationException {
+ if (tableStatuses.size() == 0) {
+ // nothing to do
+ return;
+ }
+
+ databaseStatus.setStatus(ReplicationStatus.Status.SUCCESS);
+ long successEventId = databaseStatus.getEventId();
+ long failedEventId = -1;
+
+ for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) {
+ long eventId = entry.getValue().getEventId();
+ if (entry.getValue().getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
+ if (eventId > successEventId) {
+ successEventId = eventId;
+ }
+ } else if (entry.getValue().getStatus().equals(ReplicationStatus.Status.FAILURE)) {
+ databaseStatus.setStatus(ReplicationStatus.Status.FAILURE);
+ if (eventId < failedEventId || failedEventId == -1) {
+ failedEventId = eventId;
+ }
+ } //else , if table status is Status.INIT, it should not change lastEventId of DB
+ }
+
+ String log = "Updating DB Status based on table replication status. Status : "
+ + databaseStatus.getStatus().toString() + ", eventId : ";
+ if (databaseStatus.getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
+ databaseStatus.setEventId(successEventId);
+ LOG.info(log + String.valueOf(successEventId));
+ } else if (databaseStatus.getStatus().equals(ReplicationStatus.Status.FAILURE)) {
+ databaseStatus.setEventId(failedEventId);
+ LOG.error(log + String.valueOf(failedEventId));
+ }
+
+ }
+
+ public void updateDbStatus(ReplicationStatus status) throws HiveReplicationException {
+ if (StringUtils.isNotEmpty(status.getTable())) {
+ throw new HiveReplicationException("Cannot update DB Status. This is table level status.");
+ }
+
+ if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
+ this.databaseStatus = status;
+ } else {
+ throw new HiveReplicationException("Cannot update Database Status. StatusDB "
+ + status.getDatabase() + " does not match current DB "
+ + this.databaseStatus.getDatabase());
+ }
+ }
+
+ public void updateTableStatus(ReplicationStatus status) throws HiveReplicationException {
+ if (StringUtils.isEmpty(status.getTable())) {
+ throw new HiveReplicationException("Cannot update Table Status. Table name is empty.");
+ }
+
+ if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
+ this.tableStatuses.put(status.getTable(), status);
+ } else {
+ throw new HiveReplicationException("Cannot update Table Status. TableDB "
+ + status.getDatabase() + " does not match current DB "
+ + this.databaseStatus.getDatabase());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
new file mode 100644
index 0000000..cf6b7ad
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract class for Data Replication Status Store.
+ */
+public abstract class DRStatusStore {
+
+ public static final String BASE_DEFAULT_STORE_PATH = "/apps/data-mirroring/";
+ public static final FsPermission DEFAULT_STORE_PERMISSION =
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
+
+ private static String storeGroup = "users";
+
+
+ /**
+ * Update replication status of a table(s)/db after replication job jobName completes.
+ * @param jobName Name of the replication job.
+ * @param statusList List of replication statuses of db/tables replicated by jobName.
+ */
+ public abstract void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
+ throws HiveReplicationException;
+
+ /**
+ * Get Replication status for a database.
+ * @param source Replication source uri.
+ * @param target Replication target uri.
+ * @param jobName Name of the replication job.
+ * @param database Name of the target database.
+ * @return ReplicationStatus
+ * destination commands for each table
+ */
+ public abstract ReplicationStatus getReplicationStatus(String source, String target,
+ String jobName, String database)
+ throws HiveReplicationException;
+
+ /**
+ * Get Replication status for a table.
+ * @param source Replication source uri.
+ * @param target Replication target uri.
+ * @param jobName Name of the replication job.
+ * @param database Name of the target database.
+ * @param table Name of the target table.
+ * @return ReplicationStatus
+ * destination commands for each table
+ */
+ public abstract ReplicationStatus getReplicationStatus(String source, String target,
+ String jobName, String database,
+ String table) throws HiveReplicationException;
+
+ /**
+ * Get Replication status of all tables in a database.
+ * @param source Replication source uri.
+ * @param target Replication target uri.
+ * @param jobName Name of the replication job.
+ * @param database Name of the target database.
+ * @return Iterator
+ * destination commands for each table
+ */
+ public abstract Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
+ String jobName, String database)
+ throws HiveReplicationException;
+
+
+ /**
+ * Delete a replication job.
+ * @param jobName Name of the replication job.
+ * @param database Name of the target database.
+ * destination commands for each table
+ */
+ public abstract void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException;
+
+ public static String getStoreGroup() {
+ return storeGroup;
+ }
+
+ public static void setStoreGroup(String group) {
+ storeGroup = group;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
new file mode 100644
index 0000000..3b3156f
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+/**
+ * Public delimiters used for event processing.
+ */
+public final class DelimiterUtils {
+ public static final String FIELD_DELIM = "\u0001";
+ public static final String NEWLINE_DELIM = System.getProperty("line.separator");
+ public static final String TAB_DELIM = "\t";
+
+ private DelimiterUtils() {}
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
new file mode 100644
index 0000000..fb695d0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.ReplicationEventMetadata;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Utility methods for event sourcer.
+ */
+public class EventSourcerUtils {
+
+ private static final String METAFILE_EXTENSION = ".meta";
+ private static final String SRCFILE_EXTENSION = ".src";
+ private static final String TGTFILE_EXTENSION = ".tgt";
+ private Path eventsInputDirPath;
+ private final boolean shouldKeepHistory;
+ private final FileSystem jobFS;
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventSourcerUtils.class);
+
+ public EventSourcerUtils(final Configuration conf, final boolean shouldKeepHistory,
+ final String jobName) throws Exception {
+ this.shouldKeepHistory = shouldKeepHistory;
+ jobFS = FileSystem.get(conf);
+ init(jobName);
+ }
+
+ private void init(final String jobName) throws Exception {
+ // Create base dir to store events on cluster where job is running
+ Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
+ // Validate base path
+ FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
+
+ if (!jobFS.exists(dir)) {
+ if (!jobFS.mkdirs(dir)) {
+ throw new Exception("Creating directory failed: " + dir);
+ }
+ }
+
+ eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName);
+
+ if (!jobFS.exists(eventsInputDirPath)) {
+ if (!jobFS.mkdirs(eventsInputDirPath)) {
+ throw new Exception("Creating directory failed: " + eventsInputDirPath);
+ }
+ }
+ }
+
+ public OutputStream getFileOutputStream(final String path) throws Exception {
+ return FileSystem.create(jobFS, new Path(path), FileUtils.FS_PERMISSION_700);
+ }
+
+ public void closeOutputStream(OutputStream out) throws IOException {
+ if (out != null) {
+ try {
+ out.flush();
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+ }
+
+ public void persistReplicationEvents(final OutputStream out,
+ final java.lang.Iterable
+ <? extends org.apache.hive.hcatalog.api.repl.Command> cmds)
+ throws Exception {
+ for (Command cmd : cmds) {
+ persistReplicationEvents(out, cmd);
+ }
+ }
+
+ public void persistReplicationEvents(final OutputStream out,
+ final Command cmd) throws Exception {
+ out.write(ReplicationUtils.serializeCommand(cmd).getBytes());
+ LOG.debug("HiveDR Serialized Repl Command : {}", cmd);
+ out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
+ }
+
+ public String persistToMetaFile(final ReplicationEventMetadata data, final String identifier) throws IOException {
+ if (data != null && data.getEventFileMetadata() != null && !data.getEventFileMetadata().isEmpty()) {
+ Path metaFilename = new Path(eventsInputDirPath.toString(), identifier + METAFILE_EXTENSION);
+ OutputStream out = null;
+
+ try {
+ out = FileSystem.create(jobFS, metaFilename, FileUtils.FS_PERMISSION_700);
+
+ for (Map.Entry<String, String> entry : data.getEventFileMetadata().entrySet()) {
+ out.write(entry.getKey().getBytes());
+ out.write(DelimiterUtils.FIELD_DELIM.getBytes());
+ out.write(entry.getValue().getBytes());
+ out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
+ }
+ out.flush();
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ return jobFS.makeQualified(metaFilename).toString();
+ } else {
+ return null;
+ }
+ }
+
+ public static void updateEventMetadata(ReplicationEventMetadata data, final String dbName, final String tableName,
+ final String srcFilename, final String tgtFilename) {
+ if (data == null || data.getEventFileMetadata() == null) {
+ return;
+ }
+ StringBuilder key = new StringBuilder();
+
+ if (StringUtils.isNotEmpty(dbName)) {
+ key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes()));
+ }
+ key.append(DelimiterUtils.FIELD_DELIM);
+ if (StringUtils.isNotEmpty(tableName)) {
+ key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes()));
+ }
+
+ StringBuilder value = new StringBuilder();
+ if (StringUtils.isNotEmpty(srcFilename)) {
+ value.append(srcFilename);
+ }
+ value.append(DelimiterUtils.FIELD_DELIM);
+
+ if (StringUtils.isNotEmpty(tgtFilename)) {
+ value.append(tgtFilename);
+ }
+
+ data.getEventFileMetadata().put(key.toString(), value.toString());
+ }
+
+ public static void updateEventMetadata(ReplicationEventMetadata data, final ReplicationEventMetadata inputData) {
+ if (data == null || data.getEventFileMetadata() == null || inputData == null
+ || inputData.getEventFileMetadata() == null || inputData.getEventFileMetadata().isEmpty()) {
+ return;
+ }
+
+ data.getEventFileMetadata().putAll(inputData.getEventFileMetadata());
+ }
+
+ public Path getSrcFileName(final String identifier) {
+ return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + SRCFILE_EXTENSION));
+ }
+
+ public Path getTargetFileName(final String identifier) {
+ return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + TGTFILE_EXTENSION));
+ }
+
+ public void cleanUpEventInputDir() {
+ if (!shouldKeepHistory) {
+ try {
+ jobFS.delete(eventsInputDirPath, true);
+ eventsInputDirPath = null;
+ } catch (IOException e) {
+ LOG.error("Unable to cleanup: {}", eventsInputDirPath, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
new file mode 100644
index 0000000..0b4200c
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.HiveDRArgs;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Utility class to handle Hive events for data-mirroring.
+ */
+public class EventUtils {
+ private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
+ private static final int TIMEOUT_IN_SECS = 300;
+ private static final String JDBC_PREFIX = "jdbc:";
+ private static final int RETRY_ATTEMPTS = 3;
+
+ private Configuration conf = null;
+ private String sourceHiveServer2Uri = null;
+ private String sourceDatabase = null;
+ private String sourceNN = null;
+ private String sourceNNKerberosPrincipal = null;
+ private String targetHiveServer2Uri = null;
+ private String targetStagingPath = null;
+ private String targetNN = null;
+ private String targetNNKerberosPrincipal = null;
+ private String targetStagingUri = null;
+ private List<Path> sourceCleanUpList = null;
+ private List<Path> targetCleanUpList = null;
+ private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
+
+ private FileSystem sourceFileSystem = null;
+ private FileSystem targetFileSystem = null;
+ private Connection sourceConnection = null;
+ private Connection targetConnection = null;
+ private Statement sourceStatement = null;
+ private Statement targetStatement = null;
+
+ private List<ReplicationStatus> listReplicationStatus;
+
+ public EventUtils(Configuration conf) {
+ this.conf = conf;
+ sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName());
+ sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
+ sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
+ sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
+ targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
+ targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
+ + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
+ targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
+ targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
+ sourceCleanUpList = new ArrayList<Path>();
+ targetCleanUpList = new ArrayList<Path>();
+ }
+
+ public void setupConnection() throws Exception {
+ Class.forName(DRIVER_NAME);
+ DriverManager.setLoginTimeout(TIMEOUT_IN_SECS);
+ String authTokenString = ";auth=delegationToken";
+ //To bypass findbugs check, need to store empty password in Properties.
+ Properties password = new Properties();
+ password.put("password", "");
+ String user = "";
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (currentUser != null) {
+ user = currentUser.getShortUserName();
+ }
+
+ if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+ .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
+ String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase;
+ if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
+ connString += authTokenString;
+ }
+ sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
+ sourceStatement = sourceConnection.createStatement();
+ } else {
+ String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase;
+ if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
+ connString += authTokenString;
+ }
+ targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
+ targetStatement = targetConnection.createStatement();
+ }
+ }
+
+ public void initializeFS() throws IOException {
+ LOG.info("Initializing staging directory");
+ targetStagingUri = new Path(targetNN, targetStagingPath).toString();
+ sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal));
+ targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal));
+ }
+
+ private String readEvents(Path eventFileName) throws IOException {
+ StringBuilder eventString = new StringBuilder();
+ BufferedReader in = new BufferedReader(new InputStreamReader(sourceFileSystem.open(eventFileName)));
+ try {
+ String line;
+ while ((line=in.readLine())!=null) {
+ eventString.append(line);
+ eventString.append(DelimiterUtils.NEWLINE_DELIM);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+
+ return eventString.toString();
+ }
+
+ public void processEvents(String event) throws Exception {
+ listReplicationStatus = new ArrayList<ReplicationStatus>();
+ String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM);
+ String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8");
+ String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8");
+ String exportEventStr;
+ String importEventStr;
+ if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+ .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
+ exportEventStr = readEvents(new Path(eventSplit[2]));
+ if (StringUtils.isNotEmpty(exportEventStr)) {
+ LOG.info("Process the export statements for db {} table {}", dbName, tableName);
+ processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false);
+ if (!sourceCleanUpList.isEmpty()) {
+ invokeCopy(sourceCleanUpList);
+ }
+ }
+ } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
+ .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
+ importEventStr = readEvents(new Path(eventSplit[3]));
+ if (StringUtils.isNotEmpty(importEventStr)) {
+ LOG.info("Process the import statements for db {} table {}", dbName, tableName);
+ processCommands(importEventStr, dbName, tableName, targetStatement, targetCleanUpList, true);
+ }
+ }
+ }
+
+ public List<ReplicationStatus> getListReplicationStatus() {
+ return listReplicationStatus;
+ }
+
+ private void processCommands(String eventStr, String dbName, String tableName, Statement sqlStmt,
+ List<Path> cleanUpList, boolean isImportStatements)
+ throws SQLException, HiveReplicationException, IOException {
+ String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM);
+ List<Command> deserializeCommand = new ArrayList<Command>();
+ for (String command : commandList) {
+ Command cmd = ReplicationUtils.deserializeCommand(command);
+ deserializeCommand.add(cmd);
+ List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent();
+ cleanUpList.addAll(getCleanUpPaths(cleanupLocations));
+ }
+ for (Command cmd : deserializeCommand) {
+ try {
+ LOG.debug("Executing command : {} : {} ", cmd.getEventId(), cmd.toString());
+ executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, 0);
+ } catch (Exception e) {
+ // clean up locations before failing.
+ cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
+ cleanupEventLocations(targetCleanUpList, targetFileSystem);
+ throw new HiveReplicationException("Could not process replication command for "
+ + " DB Name:" + dbName + ", Table Name:" + tableName, e);
+ }
+ }
+ }
+
+ private void executeCommand(Command cmd, String dbName, String tableName,
+ Statement sqlStmt, boolean isImportStatements, int attempt)
+ throws HiveReplicationException, SQLException, IOException {
+ for (final String stmt : cmd.get()) {
+ executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, isImportStatements, attempt);
+ }
+ if (isImportStatements) {
+ addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, tableName, cmd.getEventId());
+ }
+ }
+
+ private void executeSqlStatement(Command cmd, String dbName, String tableName,
+ Statement sqlStmt, String stmt, boolean isImportStatements, int attempt)
+ throws HiveReplicationException, SQLException, IOException {
+ try {
+ sqlStmt.execute(stmt);
+ } catch (SQLException sqeOuter) {
+ // Retry if command is retriable.
+ if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) {
+ if (isImportStatements) {
+ try {
+ cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), targetFileSystem);
+ } catch (IOException ioe) {
+ // Clean up failed before retry on target. Update failure status and return
+ addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
+ tableName, cmd.getEventId());
+ throw ioe;
+ }
+ } else {
+ cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), sourceFileSystem);
+ }
+ executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, ++attempt);
+ return; // Retry succeeded, return without throwing an exception.
+ }
+ // If we reached here, retries have failed.
+ LOG.error("SQL Exception: {}", sqeOuter);
+ undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements);
+ if (isImportStatements) {
+ addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, tableName, cmd.getEventId());
+ }
+ throw sqeOuter;
+ }
+ }
+
+ private static List<Path> getCleanUpPaths(List<String> cleanupLocations) {
+ List<Path> cleanupLocationPaths = new ArrayList<Path>();
+ for (String cleanupLocation : cleanupLocations) {
+ cleanupLocationPaths.add(new Path(cleanupLocation));
+ }
+ return cleanupLocationPaths;
+ }
+
+ private void undoCommand(Command cmd, String dbName,
+ String tableName, Statement sqlStmt, boolean isImportStatements)
+ throws SQLException, HiveReplicationException {
+ if (cmd.isUndoable()) {
+ try {
+ List<String> undoCommands = cmd.getUndo();
+ LOG.debug("Undo command: {}", StringUtils.join(undoCommands.toArray()));
+ if (undoCommands.size() != 0) {
+ for (final String undoStmt : undoCommands) {
+ sqlStmt.execute(undoStmt);
+ }
+ }
+ } catch (SQLException sqeInner) {
+ if (isImportStatements) {
+ addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
+ tableName, cmd.getEventId());
+ }
+ LOG.error("SQL Exception: {}", sqeInner);
+ throw sqeInner;
+ }
+ }
+ }
+
+ private void addReplicationStatus(ReplicationStatus.Status status, String dbName, String tableName, long eventId)
+ throws HiveReplicationException {
+ try {
+ String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName());
+ ReplicationStatus rs = new ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()),
+ conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, dbName, tableName, status, eventId);
+ listReplicationStatus.add(rs);
+ } catch (HiveReplicationException hre) {
+ throw new HiveReplicationException("Could not update replication status store for "
+ + " EventId:" + eventId
+ + " DB Name:" + dbName
+ + " Table Name:" + tableName
+ + hre.toString());
+ }
+ }
+
+ public void invokeCopy(List<Path> srcStagingPaths) throws Exception {
+ DistCpOptions options = getDistCpOptions(srcStagingPaths);
+ DistCp distCp = new DistCp(conf, options);
+ LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()),
+ targetStagingUri);
+ Job distcpJob = distCp.execute();
+ LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
+ LOG.info("Completed DistCp");
+ }
+
+ public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
+ srcStagingPaths.toArray(new Path[srcStagingPaths.size()]);
+
+ DistCpOptions distcpOptions = new DistCpOptions(srcStagingPaths, new Path(targetStagingUri));
+ /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be
+ copied to the same staging sir at target resulting in DuplicateFileException in DistCp.
+ */
+
+ distcpOptions.setSyncFolder(false);
+ distcpOptions.setBlocking(true);
+ distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
+ distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
+ return distcpOptions;
+ }
+
+ public void cleanEventsDirectory() throws IOException {
+ LOG.info("Cleaning staging directory");
+ cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
+ cleanupEventLocations(targetCleanUpList, targetFileSystem);
+ }
+
+ private void cleanupEventLocations(List<Path> cleanupList, FileSystem fileSystem)
+ throws IOException {
+ for (Path cleanUpPath : cleanupList) {
+ try {
+ fileSystem.delete(cleanUpPath, true);
+ } catch (IOException ioe) {
+ LOG.error("Cleaning up of staging directory {} failed {}", cleanUpPath, ioe.toString());
+ throw ioe;
+ }
+ }
+
+ }
+
+ public void closeConnection() throws SQLException {
+ if (sourceStatement != null) {
+ sourceStatement.close();
+ }
+
+ if (targetStatement != null) {
+ targetStatement.close();
+ }
+
+ if (sourceConnection != null) {
+ sourceConnection.close();
+ }
+ if (targetConnection != null) {
+ targetConnection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
new file mode 100644
index 0000000..6bd6319
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Utility class to validate HDFS files.
+ */
+public final class FileUtils {
+
+ public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH
+ + File.separator + "Events";
+ public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+
+ private FileUtils() {}
+
+ public static Configuration getConfiguration(final String writeEP, final String nnKerberosPrincipal) {
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", writeEP);
+ if (StringUtils.isNotEmpty(nnKerberosPrincipal)) {
+ conf.set("dfs.namenode.kerberos.principal", nnKerberosPrincipal);
+ }
+ return conf;
+ }
+
+ public static void validatePath(final FileSystem fileSystem, final Path basePath) throws IOException {
+ if (!fileSystem.exists(basePath)) {
+ throw new IOException("Please create base dir " + fileSystem.getUri() + basePath
+ + ". Please set group to " + DRStatusStore.getStoreGroup()
+ + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
+ }
+
+ if (!fileSystem.getFileStatus(basePath).getPermission().equals(DRStatusStore.DEFAULT_STORE_PERMISSION)
+ || !fileSystem.getFileStatus(basePath).getGroup().equalsIgnoreCase(DRStatusStore.getStoreGroup())) {
+ throw new IOException("Base dir " + fileSystem.getUri() + basePath
+ + " does not have correct ownership/permissions."
+ + " Please set group to " + DRStatusStore.getStoreGroup()
+ + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
new file mode 100644
index 0000000..900afe8
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DRStatusStore implementation for hive.
+ */
+public class HiveDRStatusStore extends DRStatusStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class);
+ private FileSystem fileSystem;
+
+ private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + "hiveReplicationStatusStore/";
+ private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
+
+ private static final String LATEST_FILE = "latest.json";
+ private static final int FILE_ROTATION_LIMIT = 10;
+ private static final int FILE_ROTATION_TIME = 86400000; // 1 day
+
+
+ public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException {
+ init(targetFileSystem);
+ }
+
+ public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException {
+ HiveDRStatusStore.setStoreGroup(group);
+ init(targetFileSystem);
+ }
+
+ private void init(FileSystem targetFileSystem) throws IOException {
+ this.fileSystem = targetFileSystem;
+ Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
+ FileUtils.validatePath(fileSystem, basePath);
+
+ Path storePath = new Path(DEFAULT_STORE_PATH);
+ if (!fileSystem.exists(storePath)) {
+ if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
+ throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH);
+ }
+ } else {
+ if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) {
+ throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. "
+ + "Please set to 777");
+ }
+ }
+ }
+
+ /**
+ get all DB updated by the job. get all current table statuses for the DB merge the latest repl
+ status with prev table repl statuses. If all statuses are success, store the status as success
+ with largest eventId for the DB else store status as failure for the DB and lowest eventId.
+ */
+ @Override
+ public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
+ throws HiveReplicationException {
+ Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>();
+ for (ReplicationStatus status : statusList) {
+ if (!status.getJobName().equals(jobName)) {
+ String error = "JobName for status does not match current job \"" + jobName
+ + "\". Status is " + status.toJsonString();
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+
+ // init dbStatusMap and tableStatusMap from existing statuses.
+ if (!dbStatusMap.containsKey(status.getDatabase())) {
+ DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(),
+ status.getJobName(), status.getDatabase());
+ dbStatusMap.put(status.getDatabase(), dbStatus);
+ }
+
+ // update existing statuses with new status for db/tables
+ if (StringUtils.isEmpty(status.getTable())) { // db level replication status.
+ dbStatusMap.get(status.getDatabase()).updateDbStatus(status);
+ } else { // table level replication status
+ dbStatusMap.get(status.getDatabase()).updateTableStatus(status);
+ }
+ }
+ // write to disk
+ for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) {
+ writeStatusFile(entry.getValue());
+ }
+ }
+
+ @Override
+ public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database)
+ throws HiveReplicationException {
+ return getReplicationStatus(source, target, jobName, database, null);
+ }
+
+
+ public ReplicationStatus getReplicationStatus(String source, String target,
+ String jobName, String database,
+ String table) throws HiveReplicationException {
+ if (StringUtils.isEmpty(table)) {
+ return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus();
+ } else {
+ return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table);
+ }
+ }
+
+ @Override
+ public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
+ String jobName, String database)
+ throws HiveReplicationException {
+ DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database);
+ return dbReplicationStatus.getTableStatusIterator();
+ }
+
+ @Override
+ public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException {
+ Path deletePath = getStatusDirPath(database, jobName);
+ try {
+ if (fileSystem.exists(deletePath)) {
+ fileSystem.delete(deletePath, true);
+ }
+ } catch (IOException e) {
+ throw new HiveReplicationException("Failed to delete status for Job "
+ + jobName + " and DB "+ database, e);
+ }
+
+ }
+
+ private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
+ String database) throws HiveReplicationException{
+ DBReplicationStatus dbReplicationStatus = null;
+ Path statusDirPath = getStatusDirPath(database, jobName);
+ // check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
+ // if yes, use md5 of the same for dir names. prefer to use actual db names for readability.
+
+ try {
+ if (fileSystem.exists(statusDirPath)) {
+ dbReplicationStatus = readStatusFile(statusDirPath);
+ }
+ if (null == dbReplicationStatus) {
+ // Init replication state for this database
+ ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
+ database, null, ReplicationStatus.Status.INIT, -1);
+ dbReplicationStatus = new DBReplicationStatus(initDbStatus);
+ if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
+ String error = "mkdir failed for " + statusDirPath.toString();
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+ writeStatusFile(dbReplicationStatus);
+ }
+ return dbReplicationStatus;
+ } catch (IOException e) {
+ String error = "Failed to get ReplicationStatus for job " + jobName;
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+ }
+
+ private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
+ ReplicationStatus status = dbReplicationStatus.getDatabaseStatus();
+ return getStatusDirPath(status.getDatabase(), status.getJobName());
+ }
+
+ public Path getStatusDirPath(String database, String jobName) {
+ return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName);
+ }
+
+ private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
+ dbReplicationStatus.updateDbStatusFromTableStatuses();
+ String statusDir = getStatusDirPath(dbReplicationStatus).toString();
+ try {
+ Path latestFile = new Path(statusDir + "/" + LATEST_FILE);
+ if (fileSystem.exists(latestFile)) {
+ Path renamedFile = new Path(statusDir + "/"
+ + String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json");
+ fileSystem.rename(latestFile, renamedFile);
+ }
+
+ FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION);
+ stream.write(dbReplicationStatus.toJsonString().getBytes());
+ stream.close();
+
+ } catch (IOException e) {
+ String error = "Failed to write latest Replication status into dir " + statusDir;
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+
+ rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME);
+ }
+
+ public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException {
+
+ List<String> fileList = new ArrayList<String>();
+ long now = System.currentTimeMillis();
+ try {
+ RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false);
+ while (fileIterator.hasNext()) {
+ fileList.add(fileIterator.next().getPath().toString());
+ }
+ if (fileList.size() > (numFiles+1)) {
+ // delete some files, as long as they are older than the time.
+ Collections.sort(fileList);
+ for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) {
+ long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime();
+ if ((now - modTime) > maxFileAge) {
+ Path deleteFilePath = new Path(file);
+ if (fileSystem.exists(deleteFilePath)) {
+ fileSystem.delete(deleteFilePath, false);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ String error = "Failed to rotate status files in dir " + statusDir.toString();
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+ }
+
+ private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException {
+ try {
+ Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE);
+ if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) {
+ return null;
+ } else {
+ return new DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile)));
+ }
+ } catch (IOException e) {
+ String error = "Failed to read latest Replication status from dir " + statusDirPath.toString();
+ LOG.error(error);
+ throw new HiveReplicationException(error);
+ }
+ }
+
+ public void checkForReplicationConflict(String newSource, String jobName,
+ String database, String table) throws HiveReplicationException {
+ try {
+ Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json");
+ FileStatus[] files = fileSystem.globStatus(globPath);
+ for(FileStatus file : files) {
+ DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(
+ fileSystem.open(file.getPath())));
+ ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus();
+
+ if (!(newSource.equals(existingJob.getSourceUri()))) {
+ throw new HiveReplicationException("Two different sources are attempting to replicate to same db "
+ + database + ". New Source = " + newSource
+ + ", Existing Source = " + existingJob.getSourceUri());
+ } // two different sources replicating to same DB. Conflict
+ if (jobName.equals(existingJob.getJobName())) {
+ continue;
+ } // same job, no conflict.
+
+ if (StringUtils.isEmpty(table)) {
+ // When it is DB level replication, two different jobs cannot replicate to same DB
+ throw new HiveReplicationException("Two different jobs are attempting to replicate to same db "
+ + database.toLowerCase() + ". New Job = " + jobName
+ + ", Existing Job = " + existingJob.getJobName());
+ }
+
+ /*
+ At this point, it is different table level jobs replicating from same newSource to same target. This is
+ allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and
+ job2 can replicate db1.table2. Both jobs cannot replicate to same table.
+ */
+ for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) {
+ if (table.equals(entry.getKey())) {
+ throw new HiveReplicationException("Two different jobs are trying to replicate to same table "
+ + entry.getKey() + ". New job = " + jobName
+ + ", Existing job = " + existingJob.getJobName());
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveReplicationException("Failed to read status files for DB "
+ + database, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
new file mode 100644
index 0000000..d9d6ab0
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Hive replication utility class.
+ */
+public final class HiveDRUtils {
+ /**
+ * Enum for Hive replication type.
+ */
+ public enum ReplicationType {
+ TABLE,
+ DB
+ }
+
+ /**
+ * Enum for hive-dr action type.
+ */
+ public enum ExecutionStage {
+ IMPORT,
+ EXPORT,
+ LASTEVENTS
+ }
+
+ private static final String ALL_TABLES = "*";
+
+ public static final String SEPARATOR = File.separator;
+
+ private HiveDRUtils() {}
+
+ public static ReplicationType getReplicationType(List<String> sourceTables) {
+ return (sourceTables.size() == 1 && sourceTables.get(0).equals(ALL_TABLES)) ? ReplicationType.DB
+ : ReplicationType.TABLE;
+ }
+
+ public static Configuration getDefaultConf() throws IOException {
+ Configuration conf = new Configuration();
+ conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+ String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
+ if (delegationToken != null) {
+ conf.set("mapreduce.job.credentials.binary", delegationToken);
+ conf.set("tez.credentials.path", delegationToken);
+ }
+ return conf;
+ }
+
+ public static String getFilePathFromEnv(String env) {
+ String path = System.getenv(env);
+ if (path != null && Shell.WINDOWS) {
+ // In Windows, file paths are enclosed in \" so remove them here
+ // to avoid path errors
+ if (path.charAt(0) == '"') {
+ path = path.substring(1);
+ }
+ if (path.charAt(path.length() - 1) == '"') {
+ path = path.substring(0, path.length() - 1);
+ }
+ }
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
new file mode 100644
index 0000000..ea19f09
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Create hive metastore client for user.
+ */
+public final class HiveMetastoreUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreUtils.class);
+
+ private HiveMetastoreUtils() {}
+
+ public static HCatClient initializeHiveMetaStoreClient(String metastoreUri, String metastorePrincipal,
+ String hive2Principal) throws Exception {
+ try {
+ HiveConf hcatConf = createHiveConf(HiveDRUtils.getDefaultConf(),
+ metastoreUri, metastorePrincipal, hive2Principal);
+ HCatClient client = HCatClient.create(hcatConf);
+ return client;
+ } catch (IOException e) {
+ throw new Exception("Exception creating HCatClient: " + e.getMessage(), e);
+ }
+ }
+
+ private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal,
+ String hive2Principal) throws IOException {
+ JobConf jobConf = new JobConf(conf);
+ String delegationToken = HiveDRUtils.getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
+ if (delegationToken != null) {
+ Credentials credentials = Credentials.readTokenStorageFile(new File(delegationToken), conf);
+ jobConf.setCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ }
+
+ HiveConf hcatConf = new HiveConf(jobConf, HiveConf.class);
+
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname, EximReplicationTaskFactory.class.getName());
+ if (StringUtils.isNotEmpty(metastorePrincipal)) {
+ hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal);
+ hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true");
+ hcatConf.set("hadoop.rpc.protection", "authentication");
+ }
+ if (StringUtils.isNotEmpty(hive2Principal)) {
+ hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal);
+ hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
+ }
+
+ return hcatConf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
new file mode 100644
index 0000000..bb33772
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Object to store replication status of a DB or a table.
+ */
+public class ReplicationStatus {
+
+ public static final int INDENT_FACTOR = 4;
+ private static final String SOURCE = "sourceUri";
+ private static final String TARGET = "targetUri";
+ private static final String JOB_NAME = "jobName";
+ private static final String DATABASE = "database";
+ private static final String TABLE = "table";
+ private static final String EVENT_ID = "eventId";
+ private static final String STATUS_KEY = "status";
+ private static final String STATUS_LOG = "statusLog";
+
+ /**
+ * Replication Status enum.
+ */
+ public static enum Status {
+ INIT,
+ SUCCESS,
+ FAILURE
+ }
+
+ private String sourceUri;
+ private String targetUri;
+ private String jobName;
+ private String database;
+ private String table;
+ private Status status = Status.SUCCESS;
+ private long eventId = -1;
+ private String log;
+
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public ReplicationStatus(String sourceUri, String targetUri, String jobName,
+ String database, String table,
+ ReplicationStatus.Status status, long eventId) throws HiveReplicationException {
+ init(sourceUri, targetUri, jobName, database, table, status, eventId, null);
+ }
+
+ private void init(String source, String target, String job,
+ String dbName, String tableName, ReplicationStatus.Status replStatus,
+ long eventNum, String logStr) throws HiveReplicationException {
+ setSourceUri(source);
+ setTargetUri(target);
+ setJobName(job);
+ setDatabase(dbName);
+ setTable(tableName);
+ setStatus(replStatus);
+ setEventId(eventNum);
+ setLog(logStr);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ public ReplicationStatus(String jsonString) throws HiveReplicationException {
+ try {
+ JSONObject object = new JSONObject(jsonString);
+ Status objectStatus;
+ try {
+ objectStatus = ReplicationStatus.Status.valueOf(object.getString(STATUS_KEY).toUpperCase());
+ } catch (IllegalArgumentException e1) {
+ throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus."
+ + " Invalid status " + object.getString(STATUS_KEY), e1);
+ }
+
+ init(object.getString(SOURCE), object.getString(TARGET), object.getString(JOB_NAME),
+ object.getString(DATABASE), object.has(TABLE) ? object.getString(TABLE) : null,
+ objectStatus, object.has(EVENT_ID) ? object.getLong(EVENT_ID) : -1,
+ object.has(STATUS_LOG) ? object.getString(STATUS_LOG) : null);
+ } catch (JSONException e) {
+ throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus ", e);
+ }
+
+ }
+
+ public String toJsonString() throws HiveReplicationException {
+ try {
+ return toJsonObject().toString(INDENT_FACTOR);
+ } catch (JSONException e) {
+ throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e);
+ }
+ }
+
+ public JSONObject toJsonObject() throws HiveReplicationException {
+ JSONObject jsonObject = new JSONObject();
+ try {
+ jsonObject.put(SOURCE, this.sourceUri);
+ jsonObject.put(TARGET, this.targetUri);
+ jsonObject.put(JOB_NAME, this.jobName);
+ jsonObject.put(DATABASE, this.database);
+ if (StringUtils.isNotEmpty(this.table)) {
+ jsonObject.put(TABLE, this.table);
+ }
+ jsonObject.put(STATUS_KEY, this.status.name());
+ if (this.eventId > -1) {
+ jsonObject.put(EVENT_ID, this.eventId);
+ } else {
+ jsonObject.put(EVENT_ID, -1);
+ }
+ if (StringUtils.isNotEmpty(this.log)) {
+ jsonObject.put(STATUS_LOG, this.log);
+ }
+ return jsonObject;
+ } catch (JSONException e) {
+ throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e);
+ }
+ }
+
+ public String getSourceUri() {
+ return this.sourceUri;
+ }
+
+ public void setSourceUri(String source) throws HiveReplicationException {
+ validateString(SOURCE, source);
+ this.sourceUri = source;
+ }
+
+ public String getTargetUri() {
+ return this.targetUri;
+ }
+
+ public void setTargetUri(String target) throws HiveReplicationException {
+ validateString(TARGET, target);
+ this.targetUri = target;
+ }
+
+ public String getJobName() {
+ return this.jobName;
+ }
+
+ public void setJobName(String jobName) throws HiveReplicationException {
+ validateString(JOB_NAME, jobName);
+ this.jobName = jobName;
+ }
+
+ public String getDatabase() {
+ return this.database;
+ }
+
+ public void setDatabase(String database) throws HiveReplicationException {
+ validateString(DATABASE, database);
+ this.database = database.toLowerCase();
+ }
+
+ public String getTable() {
+ return this.table;
+ }
+
+ public void setTable(String table) {
+ this.table = (table == null) ? null : table.toLowerCase();
+ }
+
+ public Status getStatus() {
+ return this.status;
+ }
+
+ public void setStatus(Status status) throws HiveReplicationException {
+ if (status != null) {
+ this.status = status;
+ } else {
+ throw new HiveReplicationException("Failed to set ReplicationStatus. Input \""
+ + STATUS_KEY + "\" cannot be empty");
+ }
+ }
+
+ public long getEventId() {
+ return this.eventId;
+ }
+
+ public void setEventId(long eventId) throws HiveReplicationException {
+ if (eventId > -1) {
+ this.eventId = eventId;
+ }
+ }
+
+ public String getLog() {
+ return this.log;
+ }
+
+ public void setLog(String log) {
+ this.log = log;
+ }
+
+ private void validateString(String inputName, String input) throws HiveReplicationException {
+ if (StringUtils.isEmpty(input)) {
+ throw new HiveReplicationException("Failed to set ReplicationStatus. Input \""
+ + inputName + "\" cannot be empty");
+ }
+ }
+
+ public String toString() {
+ return sourceUri + "\t" + targetUri + "\t" + jobName + "\t"
+ + database + "\t"+ table + "\t" + status + "\t"+ eventId;
+ }
+
+}