You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/11/17 13:38:32 UTC
falcon git commit: FALCON-1486 Add Unit Test cases for HiveDR.
Contributed by Peeyush Bishnoi.
Repository: falcon
Updated Branches:
refs/heads/master af2ea867d -> 2d5516ec3
FALCON-1486 Add Unit Test cases for HiveDR. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d5516ec
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d5516ec
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d5516ec
Branch: refs/heads/master
Commit: 2d5516ec374333a7b39de7302c5cd511ba6ba9c8
Parents: af2ea86
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Nov 17 16:10:38 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Nov 17 16:10:38 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
addons/hivedr/pom.xml | 12 +
.../java/org/apache/falcon/hive/HiveDRTool.java | 9 +-
.../falcon/hive/MetaStoreEventSourcer.java | 20 +-
.../java/org/apache/falcon/hive/HiveDRTest.java | 252 +++++++++++++++++++
pom.xml | 1 +
6 files changed, 284 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 612b3c9..18cf582 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,8 @@ Trunk (Unreleased)
FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
IMPROVEMENTS
+ FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava)
+
FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao)
FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
index c887166..f98e8c4 100644
--- a/addons/hivedr/pom.xml
+++ b/addons/hivedr/pom.xml
@@ -183,6 +183,18 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>derby.stream.error.file</name>
+ <value>target/derby.log</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index c3d9b5c..df16c40 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -30,6 +30,7 @@ import org.apache.falcon.hive.util.EventSourcerUtils;
import org.apache.falcon.hive.util.FileUtils;
import org.apache.falcon.hive.util.HiveDRStatusStore;
import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.HiveMetastoreUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.api.HCatClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -257,8 +259,11 @@ public class HiveDRTool extends Configured implements Tool {
+inputOptions.getJobName()+".id";
Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile));
try {
- defaultSourcer = new MetaStoreEventSourcer(inputOptions.getSourceMetastoreUri(),
- inputOptions.getSourceMetastoreKerberosPrincipal(), inputOptions.getSourceHive2KerberosPrincipal(),
+ HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(
+ inputOptions.getSourceMetastoreUri(),
+ inputOptions.getSourceMetastoreKerberosPrincipal(),
+ inputOptions.getSourceHive2KerberosPrincipal());
+ defaultSourcer = new MetaStoreEventSourcer(sourceMetastoreClient,
new DefaultPartitioner(drStore, eventSoucerUtil), eventSoucerUtil, lastEventsIdMap);
inputFilename = defaultSourcer.sourceEvents(inputOptions);
} finally {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
index 0e12e89..f008883 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
@@ -21,7 +21,6 @@ package org.apache.falcon.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.util.EventSourcerUtils;
import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.HiveMetastoreUtils;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.repl.ReplicationTask;
import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
@@ -49,14 +48,11 @@ public class MetaStoreEventSourcer implements EventSourcer {
private long lastCounter;
/* TODO handle cases when no events. files will be empty and lists will be empty */
- public MetaStoreEventSourcer(String sourceMetastoreUri, String sourceMetastoreKerberosPrincipal,
- String sourceHive2KerberosPrincipal, Partitioner partitioner,
- EventSourcerUtils eventSourcerUtils, Map<String, Long> lastEventsIdMap)
- throws Exception {
-
- sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(sourceMetastoreUri,
- sourceMetastoreKerberosPrincipal, sourceHive2KerberosPrincipal);
- eventMetadata = new ReplicationEventMetadata();
+ public MetaStoreEventSourcer(HCatClient sourceMetastoreClient, Partitioner partitioner,
+ EventSourcerUtils eventSourcerUtils,
+ Map<String, Long> lastEventsIdMap) throws Exception {
+ this.sourceMetastoreClient = sourceMetastoreClient;
+ this.eventMetadata = new ReplicationEventMetadata();
this.partitioner = partitioner;
this.eventSourcerUtils = eventSourcerUtils;
this.lastEventsIdMap = lastEventsIdMap;
@@ -149,7 +145,7 @@ public class MetaStoreEventSourcer implements EventSourcer {
}
- private void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName,
+ protected void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName,
String tableName, String srcStagingDirProvider,
String dstStagingDirProvider) throws Exception {
String srcFilename = null;
@@ -196,6 +192,10 @@ public class MetaStoreEventSourcer implements EventSourcer {
EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, tableName, srcFilename, tgtFilename);
}
+ public String persistToMetaFile(String jobName) throws Exception {
+ return eventSourcerUtils.persistToMetaFile(eventMetadata, jobName);
+ }
+
public void cleanUp() throws Exception {
if (sourceMetastoreClient != null) {
sourceMetastoreClient.close();
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
new file mode 100644
index 0000000..cdeddaa
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import com.google.common.base.Function;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.hadoop.JailedFileSystem;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.DelimiterUtils;
+import org.apache.falcon.hive.util.EventSourcerUtils;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for Hive DR export and import.
+ */
+public class HiveDRTest {
+ private FileSystem fileSystem;
+ private HCatClient client;
+ private MetaStoreEventSourcer sourcer;
+ private EmbeddedCluster cluster;
+ private String dbName = "testdb";
+ private String tableName = "testtable";
+ private StagingDirectoryProvider stagingDirectoryProvider;
+ private MessageFactory msgFactory = MessageFactory.getInstance();
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ client = HCatClient.create(new HiveConf());
+ initializeFileSystem();
+ sourcer = new MetaStoreEventSourcer(client, null, new EventSourcerUtils(cluster.getConf(),
+ false, "hiveReplTest"), null);
+ stagingDirectoryProvider = new StagingDirectoryProvider.TrivialImpl("/tmp", "/");
+ }
+
+ private void initializeFileSystem() throws Exception {
+ cluster = EmbeddedCluster.newCluster("hivedr");
+ fileSystem = new JailedFileSystem();
+ Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH);
+ fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf());
+ if (fileSystem.exists(storePath)) {
+ fileSystem.delete(storePath, true);
+ }
+ FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION);
+ HiveDRStatusStore drStatusStore = new HiveDRStatusStore(fileSystem,
+ fileSystem.getFileStatus(storePath).getGroup());
+ }
+
+ // Dummy mapping used for all db and table name mappings
+ private Function<String, String> debugMapping = new Function<String, String>(){
+ @Nullable
+ @Override
+ public String apply(@Nullable String s) {
+ if (s == null){
+ return null;
+ } else {
+ StringBuilder sb = new StringBuilder(s);
+ return sb.toString() + sb.reverse().toString();
+ }
+ }
+ };
+
+ @Test
+ public void testExportImportReplication() throws Exception {
+ Table t = new Table();
+ t.setDbName(dbName);
+ t.setTableName(tableName);
+ NotificationEvent event = new NotificationEvent(getEventId(), getTime(),
+ HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString());
+ event.setDbName(t.getDbName());
+ event.setTableName(t.getTableName());
+
+ HCatNotificationEvent hev = new HCatNotificationEvent(event);
+ ReplicationTask rtask = ReplicationTask.create(client, hev);
+
+ Assert.assertEquals(hev.toString(), rtask.getEvent().toString());
+ verifyExportImportReplicationTask(rtask);
+ }
+
+ private void verifyExportImportReplicationTask(ReplicationTask rtask) throws Exception {
+ Assert.assertEquals(true, rtask.needsStagingDirs());
+ Assert.assertEquals(false, rtask.isActionable());
+
+ rtask.withSrcStagingDirProvider(stagingDirectoryProvider)
+ .withDstStagingDirProvider(stagingDirectoryProvider)
+ .withDbNameMapping(debugMapping)
+ .withTableNameMapping(debugMapping);
+
+ List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>();
+ taskAdd.add(rtask);
+ sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName,
+ stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString());
+
+ String metaFileName = sourcer.persistToMetaFile("hiveReplTest");
+ String event = readEventFile(new Path(metaFileName));
+ Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4);
+ Assert.assertEquals(dbName,
+ new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8"));
+ Assert.assertEquals(tableName,
+ new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8"));
+
+ String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2]));
+ String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM);
+ for (String command : commandList) {
+ Command cmd = ReplicationUtils.deserializeCommand(command);
+ Assert.assertEquals(cmd.getEventId(), 42);
+ for(String stmt : cmd.get()) {
+ Assert.assertTrue(stmt.startsWith("EXPORT TABLE"));
+ }
+ }
+
+ String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3]));
+ commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM);
+ for (String command : commandList) {
+ Command cmd = ReplicationUtils.deserializeCommand(command);
+ Assert.assertEquals(cmd.getEventId(), 42);
+ for (String stmt : cmd.get()) {
+ Assert.assertTrue(stmt.startsWith("IMPORT TABLE"));
+ }
+ }
+ }
+
+ @Test
+ public void testImportReplication() throws Exception {
+ Table t = new Table();
+ t.setDbName("testdb");
+ t.setTableName("testtable");
+ NotificationEvent event = new NotificationEvent(getEventId(), getTime(),
+ HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString());
+ event.setDbName(t.getDbName());
+ event.setTableName(t.getTableName());
+
+ HCatNotificationEvent hev = new HCatNotificationEvent(event);
+ ReplicationTask rtask = ReplicationTask.create(client, hev);
+
+ Assert.assertEquals(hev.toString(), rtask.getEvent().toString());
+ verifyImportReplicationTask(rtask);
+ }
+
+ private void verifyImportReplicationTask(ReplicationTask rtask) throws Exception {
+ Assert.assertEquals(false, rtask.needsStagingDirs());
+ Assert.assertEquals(true, rtask.isActionable());
+ rtask.withDbNameMapping(debugMapping)
+ .withTableNameMapping(debugMapping);
+
+ List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>();
+ taskAdd.add(rtask);
+ sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName,
+ stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString());
+ String persistFileName = sourcer.persistToMetaFile("hiveReplTest");
+ String event = readEventFile(new Path(persistFileName));
+
+ Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4);
+ Assert.assertEquals(dbName,
+ new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8"));
+ Assert.assertEquals(tableName,
+ new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8"));
+
+ String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2]));
+ String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM);
+ for (String command : commandList) {
+ Command cmd = ReplicationUtils.deserializeCommand(command);
+ Assert.assertEquals(cmd.getEventId(), 42);
+ Assert.assertEquals(cmd.get().size(), 0); //In case of drop size of export is 0. Metadata operation
+ }
+
+ String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3]));
+ commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM);
+ for (String command : commandList) {
+ Command cmd = ReplicationUtils.deserializeCommand(command);
+ Assert.assertEquals(cmd.getEventId(), 42);
+ for (String stmt : cmd.get()) {
+ Assert.assertTrue(stmt.startsWith("DROP TABLE"));
+ }
+ }
+ }
+
+ private long getEventId() {
+ // Does not need to be unique, just non-zero distinct value to test against.
+ return 42;
+ }
+
+ private int getTime() {
+ // Does not need to be actual time, just non-zero distinct value to test against.
+ return 1729;
+ }
+
+ private String readEventFile(Path eventFileName) throws IOException {
+ StringBuilder eventString = new StringBuilder();
+ BufferedReader in = new BufferedReader(new InputStreamReader(
+ fileSystem.open(eventFileName)));
+ try {
+ String line;
+ while ((line=in.readLine())!=null) {
+ eventString.append(line);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ return eventString.toString();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ client.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f2c480..dfaf1c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -307,6 +307,7 @@
<exclude>**/maven-eclipse.xml</exclude>
<exclude>**/.externalToolBuilders/**</exclude>
<exclude>html5-ui/**</exclude>
+ <exclude>**/metastore_db/**</exclude>
<exclude>**/db1.log</exclude>
<exclude>**/db1.properties</exclude>
<exclude>**/db1.script</exclude>