You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/16 03:44:43 UTC
svn commit: r1446838 - in /oozie/branches/hcat-intre: ./ core/
core/src/main/java/org/apache/oozie/dependency/hcat/
core/src/main/resources/ core/src/test/java/org/apache/oozie/service/
core/src/test/resources/
Author: virag
Date: Sat Feb 16 02:44:42 2013
New Revision: 1446838
URL: http://svn.apache.org/r1446838
Log:
OOZIE-1181 Dependency cache with configurations for eviction, ttl and max elements in memory (rohini via virag)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
oozie/branches/hcat-intre/core/src/main/resources/ehcache-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
oozie/branches/hcat-intre/core/src/test/resources/ehcache.xml
Modified:
oozie/branches/hcat-intre/core/pom.xml
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/pom.xml
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Sat Feb 16 02:44:42 2013
@@ -108,6 +108,12 @@
</dependency>
<dependency>
+ <groupId>net.sf.ehcache</groupId>
+ <artifactId>ehcache-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java?rev=1446838&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java Sat Feb 16 02:44:42 2013
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency.hcat;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import net.sf.ehcache.CacheException;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.event.CacheEventListener;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.HCatAccessorService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+
+public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
+
+ private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
+ private static String TABLE_DELIMITER = "#";
+ private static String PARTITION_DELIMITER = ";";
+
+ public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
+
+ private CacheManager cacheManager;
+ /**
+ * Cache with key server#db#table#pk1=val;pk2=val2 and value as WaitingActions (list of
+ * WaitingAction) which is Serializable (for overflowToDisk)
+ */
+ private Ehcache missingCache;
+ /**
+ * Map of server#db#table - sorted part key pattern - count of elements in the cache
+ */
+ private ConcurrentMap<String, Map<String, SettableInteger>> partKeyPatterns;
+ /**
+ * Map of actionIDs and collection of available URIs
+ */
+ private ConcurrentMap<String, Collection<String>> availableDeps;
+
+ @Override
+ public void init(Configuration conf) {
+ String cacheName = conf.get(CONF_CACHE_NAME);
+ URL cacheConfigURL;
+ if (cacheName == null) {
+ cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
+ cacheName = "dependency-default";
+ }
+ else {
+ cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
+ }
+ if (cacheConfigURL == null) {
+ throw new IllegalStateException("ehcache.xml is not found in classpath");
+ }
+ cacheManager = CacheManager.newInstance(cacheConfigURL);
+ missingCache = cacheManager.getCache(cacheName);
+ if (missingCache == null) {
+ throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
+ + " is not found");
+ }
+ missingCache.getCacheEventNotificationService().registerListener(this);
+ partKeyPatterns = new ConcurrentHashMap<String, Map<String, SettableInteger>>();
+ availableDeps = new ConcurrentHashMap<String, Collection<String>>();
+ }
+
+ @Override
+ public void addMissingDependency(HCatURI hcatURI, String actionID) {
+ SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+ String partKeys = sortedPKV.getPartKeys();
+ String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+ boolean newlyAdded = true;
+ synchronized (missingCache) {
+ Element element = missingCache.get(missingKey);
+ if (element == null) {
+ WaitingActions waitingActions = new WaitingActions();
+ element = new Element(missingKey, waitingActions);
+ Element exists = missingCache.putIfAbsent(element);
+ if (exists != null) {
+ newlyAdded = false;
+ waitingActions = (WaitingActions) exists.getObjectValue();
+ }
+ waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
+ }
+ else {
+ newlyAdded = false;
+ WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
+ waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
+ }
+ }
+ if (newlyAdded) {
+ String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable();
+ synchronized (partKeyPatterns) {
+ Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+ if (patternCounts == null) {
+ patternCounts = new HashMap<String, SettableInteger>();
+ partKeyPatterns.put(tableKey, patternCounts);
+ }
+ SettableInteger count = patternCounts.get(partKeys);
+ if (count == null) {
+ patternCounts.put(partKeys, new SettableInteger(1));
+ }
+ else {
+ count.increment();
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
+ SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+ String partKeys = sortedPKV.getPartKeys();
+ String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+ boolean decrement = false;
+ boolean removed = false;
+ synchronized (missingCache) {
+ Element element = missingCache.get(missingKey);
+ if (element == null) {
+ LOG.debug("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
+ return false;
+ }
+ Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
+ WaitingAction wAction = null;
+ for (WaitingAction action : waitingActions) {
+ if (action.getActionID().equals(actionID)) {
+ wAction = action;
+ }
+ }
+ removed = waitingActions.remove(wAction);
+ if (!removed) {
+ LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
+ }
+ if (waitingActions.isEmpty()) {
+ missingCache.remove(missingKey);
+ decrement = true;
+ }
+ }
+ if (decrement) {
+ String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable();
+ decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
+ }
+ return removed;
+ }
+
+ @Override
+ public Collection<String> getWaitingActions(HCatURI hcatURI) {
+ SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+ String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable() + TABLE_DELIMITER + sortedPKV.getPartKeys() + TABLE_DELIMITER
+ + sortedPKV.getPartVals();
+ Element element = missingCache.get(missingKey);
+ if (element == null) {
+ return null;
+ }
+ WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
+ Collection<String> actionIDs = new ArrayList<String>();
+ String uriString = hcatURI.getURI().toString();
+ for (WaitingAction action : waitingActions.getWaitingActions()) {
+ if (action.getDependencyURI().equals(uriString)) {
+ actionIDs.add(action.getActionID());
+ }
+ }
+ return actionIDs;
+ }
+
+ @Override
+ public Collection<String> markDependencyAvailable(String server, String db, String table,
+ Map<String, String> partitions) {
+ String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
+ Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+ if (patternCounts == null) {
+ LOG.warn("Got partition available notification for " + tableKey
+ + ". Unexpected and should not be listening to topic. Unregistering topic");
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ hcatService.unregisterFromNotification(server, db, table);
+ return null;
+ }
+ Collection<String> actionsWithAvailDep = new HashSet<String>();
+ StringBuilder partValSB = new StringBuilder();
+ // If partition patterns are date, date;country and date;country;state,
+ // construct the partition values for each pattern and for the matching value in the
+ // missingCache, get the waiting actions and mark it as available.
+ for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
+ String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
+ partValSB.setLength(0);
+ for (String key : partKeys) {
+ partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
+ }
+ partValSB.setLength(partValSB.length() - 1);
+ String missingKey = tableKey + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER + partValSB.toString();
+ boolean removed = false;
+ Element element = null;
+ synchronized (missingCache) {
+ element = missingCache.get(missingKey);
+ if (element != null) {
+ missingCache.remove(missingKey);
+ removed = true;
+ }
+ }
+ if (removed) {
+ decrementPartKeyPatternCount(tableKey, entry.getKey(), missingKey);
+ Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
+ for (WaitingAction wAction : wActions) {
+ String actionID = wAction.getActionID();
+ actionsWithAvailDep.add(actionID);
+ Collection<String> depURIs = availableDeps.get(actionID);
+ if (depURIs == null) {
+ depURIs = new ArrayList<String>();
+ Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
+ if (existing != null) {
+ depURIs = existing;
+ }
+ }
+ synchronized (depURIs) {
+ depURIs.add(wAction.getDependencyURI());
+ availableDeps.put(actionID, depURIs);
+ }
+ }
+ }
+ }
+ return actionsWithAvailDep;
+ }
+
+ @Override
+ public Collection<String> getAvailableDependencyURIs(String actionID) {
+ return availableDeps.get(actionID);
+ }
+
+ @Override
+ public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
+ if (!availableDeps.containsKey(actionID)) {
+ return false;
+ }
+ else {
+ Collection<String> availList = availableDeps.get(actionID);
+ if (!availList.removeAll(dependencyURIs)) {
+ return false;
+ }
+ synchronized (availList) {
+ if (availList.isEmpty()) {
+ availableDeps.remove(actionID);
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void destroy() {
+ availableDeps.clear();
+ cacheManager.shutdown();
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ throw new CloneNotSupportedException();
+ }
+
+ @Override
+ public void dispose() {
+ }
+
+ @Override
+ public void notifyElementExpired(Ehcache cache, Element element) {
+ // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
+ String missingDepKey = (String) element.getObjectKey();
+ LOG.info("Cache entry [{0}] expired", missingDepKey);
+ onExpiryOrEviction(cache, element, missingDepKey);
+ }
+
+ @Override
+ public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
+
+ }
+
+ @Override
+ public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
+ }
+
+ @Override
+ public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
+ }
+
+ @Override
+ public void notifyRemoveAll(Ehcache arg0) {
+ }
+
+ @Override
+ public void notifyElementEvicted(Ehcache cache, Element element) {
+ // Invoked when maxElementsInMemory is met
+ String missingDepKey = (String) element.getObjectKey();
+ LOG.info("Cache entry [{0}] evicted", missingDepKey);
+ onExpiryOrEviction(cache, element, missingDepKey);
+ }
+
+ private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
+ int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
+ int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
+ String tableKey = missingDepKey.substring(0, partKeyIndex);
+ String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
+ decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
+ }
+
+ private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
+ synchronized (partKeyPatterns) {
+ Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+ if (patternCounts == null) {
+ LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+ + "But no corresponding pattern key table entry", hcatURI);
+ }
+ else {
+ SettableInteger count = patternCounts.get(partKeys);
+ if (count == null) {
+ LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+ + "But no corresponding pattern key entry", hcatURI);
+ }
+ else {
+ count.decrement();
+ if (count.getValue() == 0) {
+ patternCounts.remove(partKeys);
+ }
+ if (patternCounts.isEmpty()) {
+ partKeyPatterns.remove(tableKey);
+ // Close JMS session. Stop listening on topic
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ String[] tableDetails = tableKey.split(TABLE_DELIMITER);
+ hcatService.unregisterFromNotification(tableDetails[0], tableDetails[1], tableDetails[2]);
+ }
+ }
+ }
+ }
+ }
+
+ private static class SortedPKV {
+ private StringBuilder partKeys;
+ private StringBuilder partVals;
+
+ public SortedPKV(Map<String, String> partitions) {
+ this.partKeys = new StringBuilder();
+ this.partVals = new StringBuilder();
+ ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
+ Collections.sort(keys);
+ for (String key : keys) {
+ this.partKeys.append(key).append(PARTITION_DELIMITER);
+ this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
+ }
+ this.partKeys.setLength(partKeys.length() - 1);
+ this.partVals.setLength(partVals.length() - 1);
+ }
+
+ public String getPartKeys() {
+ return partKeys.toString();
+ }
+
+ public String getPartVals() {
+ return partVals.toString();
+ }
+
+ }
+
+ private static class SettableInteger {
+ private int value;
+
+ public SettableInteger(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public void increment() {
+ value++;
+ }
+
+ public void decrement() {
+ value--;
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java Sat Feb 16 02:44:42 2013
@@ -180,7 +180,9 @@ public class SimpleHCatDependencyCache i
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
LOG.warn("Got partition available notification for " + tableKey
- + ". Unexpected and should not be listening to topic. ");
+ + ". Unexpected and should not be listening to topic. Unregistering topic");
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ hcatService.unregisterFromNotification(server,db, table);
return null;
}
Collection<String> actionsWithAvailDep = new HashSet<String>();
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java Sat Feb 16 02:44:42 2013
@@ -30,10 +30,18 @@ public class WaitingAction implements Se
this.dependencyURI = dependencyURI;
}
+ /**
+ * Get the action id
+ * @return action id
+ */
public String getActionID() {
return actionID;
}
+ /**
+ * Get the dependency uri
+ * @return dependency uri
+ */
public String getDependencyURI() {
return dependencyURI;
}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java?rev=1446838&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java Sat Feb 16 02:44:42 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency.hcat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class WaitingActions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private ArrayList<WaitingAction> waitingActions;
+
+ public WaitingActions() {
+ waitingActions = new ArrayList<WaitingAction>();
+ }
+
+ /**
+ * Get the list of waiting actions
+ * @return waiting actions
+ */
+ public Collection<WaitingAction> getWaitingActions() {
+ return waitingActions;
+ }
+
+ /**
+ * Add a waiting action
+ * @param waitingAction waiting action
+ */
+ public void add(WaitingAction waitingAction) {
+ waitingActions.add(waitingAction);
+ }
+
+}
Added: oozie/branches/hcat-intre/core/src/main/resources/ehcache-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/ehcache-default.xml?rev=1446838&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/ehcache-default.xml (added)
+++ oozie/branches/hcat-intre/core/src/main/resources/ehcache-default.xml Sat Feb 16 02:44:42 2013
@@ -0,0 +1,10 @@
+<ehcache>
+ <!-- http://svn.codehaus.org/sonar/tags/1.1/sonar-commons/src/main/resources/ehcache.xml -->
+ <diskStore path="java.io.tmpdir"/>
+ <!-- Evict cache entry after 2 days (timeToLiveSeconds) -->
+ <!-- Note: overflowToDisk=true does not work well with timeToIdleSeconds or timeToLiveSeconds.
+ Returns null even for unexpired entries -->
+ <cache name="dependency-default" maxElementsInMemory="60000" eternal="false"
+ overflowToDisk="false" timeToIdleSeconds="0" timeToLiveSeconds="172800"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="1200" />
+</ehcache>
\ No newline at end of file
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java?rev=1446838&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java Sat Feb 16 02:44:42 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.Collection;
+
+import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+import org.junit.Test;
+
+public class TestPartitionDependencyManagerEhcache extends TestPartitionDependencyManagerService {
+
+ private static XLog LOG = XLog.getLog(TestPartitionDependencyManagerEhcache.class);
+
+ private void setupServices(String cacheName) throws ServiceException {
+ Services.get().destroy();
+ services = super.setupServicesForHCatalog();
+ services.getConf().set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+ EhcacheHCatDependencyCache.class.getName());
+ if (cacheName != null) {
+ services.getConf().set(EhcacheHCatDependencyCache.CONF_CACHE_NAME, cacheName);
+ }
+ services.init();
+ }
+
+ @Override
+ @Test
+ public void testPartitionDependency() throws Exception {
+ setupServices(null);
+ super.testPartitionDependency();
+ }
+
+ @Override
+ @Test
+ public void testMemoryUsageAndSpeed() throws Exception {
+ // use all small case. Configured insrc/test/resources/ehcache.xml
+ setupServices("testnospilltodisk");
+ assertSpeedAndMemory(60000, 4500, 2000, 45000000, 40000000);
+ }
+
+ @Test
+ public void testMemoryUsageAndSpeedOverflowToDisk() throws Exception {
+ setupServices("testspilltodisk"); // maxElementsInMemory="20000". 2/3 on disk
+ // Insert and retrieve are between 15-30 seconds
+ // When run individually memIncreaseAfterInsert is < 45MB. But running with
+ // all tests it goes to 60MB.
+ assertSpeedAndMemory(60000, 30000, 11000, 60000000, 25000000);
+ }
+
+ @Test
+ public void testEvictionOnTimeToIdle() throws Exception {
+ setupServices("testevictionontimetoidle");
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ int numItems = 50;
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < numItems; i++) {
+ HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + i);
+ pdms.addMissingDependency(dep, "" + i);
+ }
+ verifyWaitingAction(pdms, numItems);
+ LOG.info("Time taken to insert and retrive " + numItems + " items is "
+ + (System.currentTimeMillis() - startTime));
+ // timeToIdleSeconds is 1
+ Thread.sleep(1000);
+ for (int i = 0; i < numItems; i++) {
+ assertNull(pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)));
+ }
+ }
+
+ @Test
+ public void testEvictionOnTimeToLive() throws Exception {
+ setupServices("testevictionontimetolive");
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ int numItems = 50;
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < numItems; i++) {
+ HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + i);
+ pdms.addMissingDependency(dep, "" + i);
+ }
+ verifyWaitingAction(pdms, numItems);
+ LOG.info("Time taken to insert and retrive " + numItems + " items is "
+ + (System.currentTimeMillis() - startTime));
+ // timeToLiveSeconds is 1
+ Thread.sleep(1000);
+ for (int i = 0; i < numItems; i++) {
+ assertNull(pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)));
+ }
+ }
+
+ @Test
+ public void testMaxElementsInMemory() throws Exception {
+ setupServices("testmaxelementsinmemory"); // maxElementsInMemory="500" overflowToDisk="false"
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ int numItems = 1000;
+ for (int i = 0; i < numItems; i++) {
+ HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + i);
+ pdms.addMissingDependency(dep, "" + i);
+ }
+ // First 500 should have been evicted. But it is LRU and the next 100 removed is between 350 and 650.
+ for (int i = 0; i < 350; i++) {
+ assertNull(pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)));
+ }
+ int evicted = 0;
+ for (int i = 350; i < 650; i++) {
+ if (pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)) == null) {
+ evicted++;
+ }
+ }
+ assertEquals(150, evicted);
+ for (int i = 650; i < 1000; i++) {
+ String actionID = "" + i;
+ HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + actionID);
+ Collection<String> waitingActions = pdms.getWaitingActions(dep);
+ assertNotNull(dep.toURIString() + " is missing in cache", waitingActions);
+ assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Sat Feb 16 02:44:42 2013
@@ -106,9 +106,16 @@ public class TestPartitionDependencyMana
assertNull(pdms.getAvailableDependencyURIs(actionId3));
}
+ @Test
public void testMemoryUsageAndSpeed() throws Exception {
+ // 2 to 4 seconds to insert 60K and 1 to 2 seconds to retrieve 60K
+ // 35-45MB for 60K entries
+ assertSpeedAndMemory(60000, 4000, 2000, 45000000, 40000000);
+ }
+
+ protected void assertSpeedAndMemory(int numItems, int insertTimeinMillis, int retrievalTimeinMillis,
+ long memIncreaseAfterInsert, long memIncreaseAfterInsertAndGC) throws Exception {
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
- int numItems = 60000;
System.gc();
MemoryMXBean mb = ManagementFactory.getMemoryMXBean();
long usedMemBeforeInsert = mb.getHeapMemoryUsage().getUsed();
@@ -120,14 +127,12 @@ public class TestPartitionDependencyMana
long usedMemAfterInsert = mb.getHeapMemoryUsage().getUsed();
long endTime = System.currentTimeMillis();
LOG.info("Time taken to insert " + numItems + " items is " + (endTime - startTime));
- assertTrue((endTime - startTime) < 4000); // 2 to 4 seconds to insert 60K
+ assertTrue((endTime - startTime) < insertTimeinMillis);
LOG.info("Memory before and after insert: " + usedMemBeforeInsert + "," + usedMemAfterInsert);
- for (int i = 0; i < numItems; i++) {
- verifyWaitingAction(pdms, "" + i);
- }
+ verifyWaitingAction(pdms, numItems);
LOG.info("Time taken to retrieve " + numItems + " items is " + (System.currentTimeMillis() - endTime));
- assertTrue((System.currentTimeMillis() - endTime) < 2000); // 1 to 2 seconds to retrieve 60K
+ assertTrue((System.currentTimeMillis() - endTime) < retrievalTimeinMillis);
long usedMemAfterRetrieval = mb.getHeapMemoryUsage().getUsed();
System.gc();
@@ -138,14 +143,19 @@ public class TestPartitionDependencyMana
LOG.info("Memory after retrieval = " + usedMemAfterRetrieval);
LOG.info("Memory after GC = " + usedMemAfterGC);
- assertTrue((usedMemAfterInsert - usedMemBeforeInsert) < 45000000); //35-45MB for 60K entries
+ // Commenting out as memory assertion is not reliable when running the full suite of tests.
+ //assertTrue((usedMemAfterInsert - usedMemBeforeInsert) < memIncreaseAfterInsert);
+ //assertTrue((usedMemAfterGC - usedMemBeforeInsert) < memIncreaseAfterInsertAndGC);
}
- protected void verifyWaitingAction(PartitionDependencyManagerService pdms, String actionID) throws URISyntaxException {
- HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + actionID);
- Collection<String> waitingActions = pdms.getWaitingActions(dep);
- assertNotNull(dep.toURIString() + " is missing in cache", waitingActions);
- assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
+ protected void verifyWaitingAction(PartitionDependencyManagerService pdms, int numItems) throws URISyntaxException {
+ for (int i = 0; i < numItems; i++) {
+ String actionID = "" + i;
+ HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + actionID);
+ Collection<String> waitingActions = pdms.getWaitingActions(dep);
+ assertNotNull(dep.toURIString() + " is missing in cache", waitingActions);
+ assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
+ }
}
}
Added: oozie/branches/hcat-intre/core/src/test/resources/ehcache.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/resources/ehcache.xml?rev=1446838&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/resources/ehcache.xml (added)
+++ oozie/branches/hcat-intre/core/src/test/resources/ehcache.xml Sat Feb 16 02:44:42 2013
@@ -0,0 +1,22 @@
+<ehcache>
+ <!-- http://svn.codehaus.org/sonar/tags/1.1/sonar-commons/src/main/resources/ehcache.xml -->
+ <diskStore path="target"/>
+ <cache name="testspilltodisk" maxElementsInMemory="20000" eternal="false"
+ overflowToDisk="true" timeToIdleSeconds="500" timeToLiveSeconds="0"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="120" />
+ <cache name="testnospilltodisk" maxElementsInMemory="0" eternal="false"
+ overflowToDisk="false" timeToIdleSeconds="500" timeToLiveSeconds="0"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="120" />
+ <!-- Note: overflowToDisk=true does not work well with timeToIdleSeconds or timeToLiveSeconds.
+ Returns null even for unexpired entries. If maxElementsInMemory="5" and overflowToDisk="true"
+ testevictionontimetoidle and testevictionontimetolive tests will fail -->
+ <cache name="testevictionontimetoidle" maxElementsInMemory="0" eternal="false"
+ overflowToDisk="false" timeToIdleSeconds="1" timeToLiveSeconds="0"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="120" />
+ <cache name="testevictionontimetolive" maxElementsInMemory="0" eternal="false"
+ overflowToDisk="false" timeToIdleSeconds="0" timeToLiveSeconds="1"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="120" />
+ <cache name="testmaxelementsinmemory" maxElementsInMemory="500" eternal="false"
+ overflowToDisk="false" timeToIdleSeconds="0" timeToLiveSeconds="0"
+ diskPersistent="false" diskExpiryThreadIntervalSeconds="120" />
+</ehcache>
Modified: oozie/branches/hcat-intre/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Sat Feb 16 02:44:42 2013
@@ -341,6 +341,12 @@
</dependency>
<dependency>
+ <groupId>net.sf.ehcache</groupId>
+ <artifactId>ehcache-core</artifactId>
+ <version>2.6.3</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>${streaming.version}</version>
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1446838&r1=1446837&r2=1446838&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Sat Feb 16 02:44:42 2013
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1181 Dependency cache with configurations for eviction, ttl and max elements in memory (rohini via virag)
OOZIE-1054 Create script to properly upload sharelib to HDFS (bowenzhangusa via tucu)
OOZIE-1217 Address review comments in OOZIE-1210 (rohini via virag)
OOZIE-1189 add filter option to specify JobID and AppName in SLA CLI command (egashira via mona)