You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/03/22 11:23:27 UTC
[22/50] [abbrv] oozie git commit: OOZIE-2781 HCat partition available
notification is not sent to coordinator actions if coordinator job is using a
different hostname (cname, IP address, etc. ) for HCat URL
OOZIE-2781 HCat partition available notification is not sent to coordinator actions if coordinator job is using a different hostname (cname, IP address, etc. ) for HCat URL
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c52967df
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c52967df
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c52967df
Branch: refs/heads/oya
Commit: c52967dfb6883bb6469567a2f951bab0c7948855
Parents: 9035d91
Author: puru <pu...@gmail.com>
Authored: Sun Jan 29 19:09:45 2017 -0800
Committer: puru <pu...@gmail.com>
Committed: Sun Jan 29 19:09:45 2017 -0800
----------------------------------------------------------------------
.../hcat/EhcacheHCatDependencyCache.java | 42 +++++++++----
.../hcat/SimpleHCatDependencyCache.java | 63 ++++++++++++++++++--
core/src/main/resources/oozie-default.xml | 11 ++++
.../TestPartitionDependencyManagerService.java | 55 +++++++++++++++--
release-log.txt | 1 +
5 files changed, 148 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 3bc4675..6f0abf6 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -18,6 +18,7 @@
package org.apache.oozie.dependency.hcat;
+import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
@@ -40,6 +41,7 @@ import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.event.CacheEventListener;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
@@ -56,6 +58,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
private CacheManager cacheManager;
+ private boolean useCanonicalHostName = false;
+
/**
* Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
* WaitingAction) which is Serializable (for overflowToDisk)
@@ -100,18 +104,20 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
missingDepsByServer = new ConcurrentHashMap<String, Cache>();
partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
+ useCanonicalHostName = ConfigurationService.getBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME);
+
}
@Override
public void addMissingDependency(HCatURI hcatURI, String actionID) {
-
+ String serverName = canonicalizeHostname(hcatURI.getServer());
// Create cache for the server if we don't have one
- Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ Cache missingCache = missingDepsByServer.get(serverName);
if (missingCache == null) {
CacheConfiguration clonedConfig = cacheConfig.clone();
- clonedConfig.setName(hcatURI.getServer());
+ clonedConfig.setName(serverName);
missingCache = new Cache(clonedConfig);
- Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
+ Cache exists = missingDepsByServer.putIfAbsent(serverName, missingCache);
if (exists == null) {
cacheManager.addCache(missingCache);
missingCache.getCacheEventNotificationService().registerListener(this);
@@ -148,7 +154,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
// Increment count for the partition key pattern
if (newlyAdded) {
- String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
synchronized (partKeyPatterns) {
ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
@@ -170,7 +176,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
@Override
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
- Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
if (missingCache == null) {
LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
@@ -202,7 +208,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
}
// Decrement partition key pattern count if the cache entry is removed
if (decrement) {
- String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
}
@@ -212,7 +218,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
@Override
public Collection<String> getWaitingActions(HCatURI hcatURI) {
Collection<String> actionIDs = null;
- Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
if (missingCache != null) {
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
@@ -221,7 +227,15 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
if (element != null) {
WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
actionIDs = new ArrayList<String>();
- String uriString = hcatURI.getURI().toString();
+ URI uri = hcatURI.getURI();
+ String uriString = null;
+ try {
+ uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(),
+ uri.getQuery(), uri.getFragment()).toString();
+ }
+ catch (URISyntaxException e) {
+ uriString = hcatURI.toURIString();
+ }
for (WaitingAction action : waitingActions.getWaitingActions()) {
if (action.getDependencyURI().equals(uriString)) {
actionIDs.add(action.getActionID());
@@ -235,7 +249,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
@Override
public Collection<String> markDependencyAvailable(String server, String db, String table,
Map<String, String> partitions) {
- String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
+ String tableKey = canonicalizeHostname(server) + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
synchronized (partKeyPatterns) {
Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
@@ -512,8 +526,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
// Decrement partition key pattern count if the cache entry is removed
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
- String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
- + hcatURI.getTable();
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb()
+ + TABLE_DELIMITER + hcatURI.getTable();
String hcatURIStr = hcatURI.toURIString();
decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
}
@@ -527,4 +541,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
// to be implemented when reverse-lookup data structure for purging is added
}
+ public String canonicalizeHostname(String name) {
+ return SimpleHCatDependencyCache.canonicalizeHostname(name, useCanonicalHostName);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
index 9e24c9a..1b2bd24 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
@@ -18,6 +18,9 @@
package org.apache.oozie.dependency.hcat;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,8 +34,8 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
@@ -43,6 +46,11 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
private static String DELIMITER = ";";
+ public static final String USE_CANONICAL_HOSTNAME ="oozie.service.HCatAccessorService.jms.use.canonical.hostname";
+ private boolean useCanonicalHostName = false;
+
+
+
/**
* Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
* value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
@@ -65,11 +73,13 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
+ useCanonicalHostName = ConfigurationService.getBoolean(USE_CANONICAL_HOSTNAME);
}
@Override
public void addMissingDependency(HCatURI hcatURI, String actionID) {
- String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ + hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
// Partition keys seperated by ;. For eg: date;country;state
String partKey = sortedPKV.getPartKeys();
@@ -119,7 +129,8 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
@Override
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
- String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ + hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKey = sortedPKV.getPartKeys();
String partVal = sortedPKV.getPartVals();
@@ -181,7 +192,8 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
@Override
public Collection<String> getWaitingActions(HCatURI hcatURI) {
- String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
+ String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ + hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKey = sortedPKV.getPartKeys();
String partVal = sortedPKV.getPartVals();
@@ -198,7 +210,16 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
return null;
}
Collection<String> actionIDs = new ArrayList<String>();
- String uriString = hcatURI.toURIString();
+ URI uri = hcatURI.getURI();
+ String uriString = null;
+ try {
+ uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(),
+ uri.getQuery(), uri.getFragment()).toString();
+ }
+ catch (URISyntaxException e) {
+ uriString = hcatURI.toURIString();
+ }
+
for (WaitingAction action : waitingActions) {
if (action.getDependencyURI().equals(uriString)) {
actionIDs.add(action.getActionID());
@@ -210,7 +231,7 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
@Override
public Collection<String> markDependencyAvailable(String server, String db, String table,
Map<String, String> partitions) {
- String tableKey = server + DELIMITER + db + DELIMITER + table;
+ String tableKey = canonicalizeHostname(server) + DELIMITER + db + DELIMITER + table;
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
LOG.warn("Got partition available notification for " + tableKey
@@ -411,4 +432,34 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
public void removeCoordActionWithDependenciesAvailable(String coordAction) {
actionPartitionMap.remove(coordAction);
}
+
+ public String canonicalizeHostname(String name) {
+ return canonicalizeHostname(name, useCanonicalHostName);
+ }
+
+ public static String canonicalizeHostname(String name, boolean useCanonicalHostName) {
+ if (useCanonicalHostName) {
+ String hostname = name;
+ String port = null;
+ if (name.contains(":")) {
+ hostname = name.split(":")[0];
+ port = name.split(":")[1];
+ }
+ try {
+ InetAddress address = InetAddress.getByName(hostname);
+ String canonicalHostName = address.getCanonicalHostName();
+ if (null != port) {
+ return canonicalHostName + ":" + port;
+ }
+ return canonicalHostName;
+ }
+ catch (IOException ex) {
+ LOG.error(ex);
+ return name;
+ }
+ }
+ else {
+ return name;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index ff9da58..95e0c36 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -211,6 +211,17 @@
</description>
</property>
+ <!-- HCatAccessorService -->
+ <property>
+ <name>oozie.service.HCatAccessorService.jms.use.canonical.hostname</name>
+ <value>false</value>
+ <description>The JMS messages published from a HCat server usually contains the canonical hostname of the HCat server
+ in standalone mode or the canonical name of the VIP in a case of multiple nodes in a HA setup. This setting is used
+ to translate the HCat server hostname or its aliases specified by the user in the HCat URIs of the coordinator dependencies
+ to its canonical name so that they can be exactly matched with the JMS dependency availability notifications.
+ </description>
+ </property>
+
<!-- TopicService -->
<property>
http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
index a5d2ed9..6996779 100644
--- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
@@ -20,16 +20,13 @@ package org.apache.oozie.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
+import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
@@ -137,6 +134,39 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
}
+ @Test
+ public void testHCatCanonicalHostName() throws Exception {
+ ConfigurationService.setBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME, true);
+ ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+ SimpleHCatDependencyCacheExtended.class.getName());
+ services.init();
+
+ // Test all APIs related to dependency caching
+ String actionId1 = "1";
+
+ String server1 = "hcat-server1-A:5080";
+ String server2 = "hcat-server1-B:5080";
+ String db = "mydb";
+ String table1 = "mytbl1";
+ HCatURI dep1 = new HCatURI(new URI("hcat://" + server1 + "/" + db + "/" + table1 + "/dt=20120101;country=us"));
+ HCatURI dep2 = new HCatURI(new URI("hcat://" + server2 + "/" + db + "/" + table1 + "/dt=20120101;country=us"));
+
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ addMissingDependencyAndRegister(dep1, actionId1, pdms);
+ assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
+ assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
+
+ ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+ EhcacheHCatDependencyCacheExtended.class.getName());
+ services.init();
+
+ pdms = Services.get().get(PartitionDependencyManagerService.class);
+ addMissingDependencyAndRegister(dep1, actionId1, pdms);
+ assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
+ assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
+
+ }
+
protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) {
pdms.addMissingDependency(hcatURI, actionId);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
@@ -198,3 +228,16 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
}
}
}
+
+class SimpleHCatDependencyCacheExtended extends SimpleHCatDependencyCache {
+ public String canonicalizeHostname(String name) {
+ return name.replace("-B", "-A");
+ }
+
+}
+
+class EhcacheHCatDependencyCacheExtended extends EhcacheHCatDependencyCache {
+ public String canonicalizeHostname(String name) {
+ return name.replace("-B", "-A");
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 081ead8..2b806fc 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2781 HCat partition available notification is not sent to coordinator actions if coordinator job is using a different hostname (cname, IP address, etc. ) for HCat URL (puru)
OOZIE-2770 Show missing dependencies for coord actions (puru)
OOZIE-2630 Oozie Coordinator EL Functions to get first day of the week/month (satishsaley)
OOZIE-2771 Allow retrieving keystore and truststore passwords from Hadoop Credential Provider (asasvari via abhishekbafna)