You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2006/10/27 16:16:15 UTC
svn commit: r468388 [2/2] - in /jackrabbit/trunk/contrib/spi:
jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/
jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/lock/
jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/observation/
jcr2spi/src/ma...
Modified: jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java?view=diff&rev=468388&r1=468387&r2=468388
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java Fri Oct 27 07:16:13 2006
@@ -68,9 +68,8 @@
import org.apache.jackrabbit.webdav.ordering.OrderingConstants;
import org.apache.jackrabbit.webdav.observation.SubscriptionInfo;
import org.apache.jackrabbit.webdav.observation.EventType;
-import org.apache.jackrabbit.webdav.observation.Filter;
-import org.apache.jackrabbit.webdav.observation.ObservationConstants;
import org.apache.jackrabbit.webdav.observation.EventDiscovery;
+import org.apache.jackrabbit.webdav.observation.ObservationConstants;
import org.apache.jackrabbit.webdav.security.SecurityConstants;
import org.apache.jackrabbit.webdav.security.CurrentUserPrivilegeSetProperty;
import org.apache.jackrabbit.webdav.security.Privilege;
@@ -112,11 +111,9 @@
import org.apache.jackrabbit.spi.QNodeDefinition;
import org.apache.jackrabbit.spi.QPropertyDefinition;
import org.apache.jackrabbit.spi.QItemDefinition;
-import org.apache.jackrabbit.spi.EventListener;
-import org.apache.jackrabbit.spi.EventIterator;
-import org.apache.jackrabbit.spi.Event;
import org.apache.jackrabbit.spi.IdFactory;
import org.apache.jackrabbit.spi.LockInfo;
+import org.apache.jackrabbit.spi.EventBundle;
import org.apache.jackrabbit.util.Text;
import org.apache.jackrabbit.uuid.UUID;
import org.apache.jackrabbit.value.ValueFormat;
@@ -150,13 +147,11 @@
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;
-import java.util.Map;
import java.util.Iterator;
import java.util.HashSet;
import java.util.Set;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Collections;
import java.io.InputStream;
import java.io.IOException;
@@ -181,8 +176,6 @@
}
private static final SubscriptionInfo S_INFO = new SubscriptionInfo(ALL_EVENTS, true, DavConstants.INFINITE_TIMEOUT);
- private static long POLL_INTERVAL = 300000000; // TODO: make configurable
-
private final IdFactory idFactory;
private final ValueFactory valueFactory;
@@ -310,7 +303,7 @@
* @return
* @throws RepositoryException
*/
- private EventIterator execute(DavMethod method, SessionInfo sessionInfo) throws RepositoryException {
+ private EventBundle[] execute(DavMethod method, SessionInfo sessionInfo) throws RepositoryException {
// TODO: build specific subscrUri
// TODO: check if 'all event' subscription is ok
String subscrUri = uriResolver.getRootItemUri(sessionInfo.getWorkspaceName());
@@ -324,7 +317,12 @@
getClient(sessionInfo).executeMethod(method);
method.checkSuccess();
- EventIterator events = poll(subscrUri, subscrId, sessionInfo);
+ // TODO: poll until we see our change
+ EventBundle[] events = null;
+ int retries = 10;
+ while ((events == null || events.length == 0) && retries-- > 0) {
+ events = poll(subscrUri, subscrId, sessionInfo);
+ }
return events;
} catch (IOException e) {
throw new RepositoryException(e);
@@ -425,7 +423,7 @@
if (!wspName.equals(workspaceName)) {
throw new LoginException("Login failed: Invalid workspace name " + workspaceName);
}
- return new SessionInfoImpl(credentials, workspaceName, new SubscriptionMgrImpl());
+ return new SessionInfoImpl(credentials, workspaceName);
} else {
throw new LoginException("Login failed: Unknown workspace '" + workspaceName+ " '.");
}
@@ -442,8 +440,11 @@
public void dispose(SessionInfo sessionInfo) throws RepositoryException {
checkSessionInfo(sessionInfo);
- SubscriptionManager sMgr = ((SessionInfoImpl)sessionInfo).getSubscriptionManager();
- sMgr.dispose();
+ String subscriptionId = ((SessionInfoImpl)sessionInfo).getSubscriptionId();
+ if (subscriptionId != null) {
+ String rootUri = uriResolver.getRootItemUri(sessionInfo.getWorkspaceName());
+ unsubscribe(rootUri, subscriptionId, sessionInfo);
+ }
}
/**
@@ -769,19 +770,19 @@
/**
* @see RepositoryService#submit(Batch)
*/
- public EventIterator submit(Batch batch) throws RepositoryException {
+ public EventBundle[] submit(Batch batch) throws RepositoryException {
if (!(batch instanceof BatchImpl)) {
throw new RepositoryException("Unknown Batch implementation.");
}
BatchImpl batchImpl = (BatchImpl) batch;
if (batchImpl.isEmpty()) {
batchImpl.dispose();
- return IteratorHelper.EMPTY;
+ return new EventBundle[]{EventBundleImpl.EMPTY};
}
// send batched information
try {
HttpClient client = batchImpl.start();
- EventIterator events;
+ EventBundle[] events;
boolean success = false;
try {
Iterator it = batchImpl.methods();
@@ -816,7 +817,7 @@
/**
* @see RepositoryService#importXml(SessionInfo, NodeId, InputStream, int)
*/
- public EventIterator importXml(SessionInfo sessionInfo, NodeId parentId, InputStream xmlStream, int uuidBehaviour) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
+ public EventBundle[] importXml(SessionInfo sessionInfo, NodeId parentId, InputStream xmlStream, int uuidBehaviour) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
// TODO: improve. currently random name is built instead of retrieving name of new resource from top-level xml element within stream
QName nodeName = new QName(QName.NS_DEFAULT_URI, UUID.randomUUID().toString());
String uri = getItemUri(parentId, nodeName, sessionInfo);
@@ -829,7 +830,7 @@
/**
* @see RepositoryService#move(SessionInfo, NodeId, NodeId, QName)
*/
- public EventIterator move(SessionInfo sessionInfo, NodeId srcNodeId, NodeId destParentNodeId, QName destName) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
+ public EventBundle[] move(SessionInfo sessionInfo, NodeId srcNodeId, NodeId destParentNodeId, QName destName) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
String uri = getItemUri(srcNodeId, sessionInfo);
String destUri = getItemUri(destParentNodeId, destName, sessionInfo);
MoveMethod method = new MoveMethod(uri, destUri, true);
@@ -839,7 +840,7 @@
/**
* @see RepositoryService#copy(SessionInfo, String, NodeId, NodeId, QName)
*/
- public EventIterator copy(SessionInfo sessionInfo, String srcWorkspaceName, NodeId srcNodeId, NodeId destParentNodeId, QName destName) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
+ public EventBundle[] copy(SessionInfo sessionInfo, String srcWorkspaceName, NodeId srcNodeId, NodeId destParentNodeId, QName destName) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
String uri = uriResolver.getItemUri(srcNodeId, srcWorkspaceName, sessionInfo);
String destUri = getItemUri(destParentNodeId, destName, sessionInfo);
CopyMethod method = new CopyMethod(uri, destUri, true, false);
@@ -849,7 +850,7 @@
/**
* @see RepositoryService#update(SessionInfo, NodeId, String)
*/
- public EventIterator update(SessionInfo sessionInfo, NodeId nodeId, String srcWorkspaceName) throws NoSuchWorkspaceException, AccessDeniedException, LockException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] update(SessionInfo sessionInfo, NodeId nodeId, String srcWorkspaceName) throws NoSuchWorkspaceException, AccessDeniedException, LockException, InvalidItemStateException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
String workspUri = uriResolver.getWorkspaceUri(srcWorkspaceName);
@@ -859,7 +860,7 @@
/**
* @see RepositoryService#clone(SessionInfo, String, NodeId, NodeId, QName, boolean)
*/
- public EventIterator clone(SessionInfo sessionInfo, String srcWorkspaceName, NodeId srcNodeId, NodeId destParentNodeId, QName destName, boolean removeExisting) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
+ public EventBundle[] clone(SessionInfo sessionInfo, String srcWorkspaceName, NodeId srcNodeId, NodeId destParentNodeId, QName destName, boolean removeExisting) throws NoSuchWorkspaceException, ConstraintViolationException, VersionException, AccessDeniedException, PathNotFoundException, ItemExistsException, LockException, UnsupportedRepositoryOperationException, RepositoryException {
// TODO: missing implementation
throw new UnsupportedOperationException("Missing implementation");
}
@@ -908,13 +909,13 @@
/**
* @see RepositoryService#lock(SessionInfo, NodeId, boolean, boolean)
*/
- public EventIterator lock(SessionInfo sessionInfo, NodeId nodeId, boolean deep, boolean sessionScoped) throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] lock(SessionInfo sessionInfo, NodeId nodeId, boolean deep, boolean sessionScoped) throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
try {
String uri = getItemUri(nodeId, sessionInfo);
Scope scope = (sessionScoped) ? ItemResourceConstants.EXCLUSIVE_SESSION : Scope.EXCLUSIVE;
LockMethod method = new LockMethod(uri, scope, Type.WRITE,
sessionInfo.getUserID(), DavConstants.INFINITE_TIMEOUT, deep);
- EventIterator events = execute(method, sessionInfo);
+ EventBundle[] events = execute(method, sessionInfo);
String lockToken = method.getLockToken();
sessionInfo.addLockToken(lockToken);
@@ -931,7 +932,7 @@
/**
* @see RepositoryService#refreshLock(SessionInfo, NodeId)
*/
- public EventIterator refreshLock(SessionInfo sessionInfo, NodeId nodeId) throws LockException, RepositoryException {
+ public EventBundle[] refreshLock(SessionInfo sessionInfo, NodeId nodeId) throws LockException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
// since sessionInfo does not allow to retrieve token by NodeId,
// pass all available lock tokens to the LOCK method (TODO: correct?)
@@ -942,7 +943,7 @@
/**
* @see RepositoryService#unlock(SessionInfo, NodeId)
*/
- public EventIterator unlock(SessionInfo sessionInfo, NodeId nodeId) throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] unlock(SessionInfo sessionInfo, NodeId nodeId) throws UnsupportedRepositoryOperationException, LockException, AccessDeniedException, InvalidItemStateException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
// Note: since sessionInfo does not allow to identify the id of the
// lock holding node, we need to access the token via lockInfo
@@ -953,7 +954,7 @@
// TODO: ev. additional check if lt is present on the sessionInfo?
UnLockMethod method = new UnLockMethod(uri, lockToken);
- EventIterator events = execute(method, sessionInfo);
+ EventBundle[] events = execute(method, sessionInfo);
sessionInfo.removeLockToken(lockToken);
return events;
@@ -962,7 +963,7 @@
/**
* @see RepositoryService#checkin(SessionInfo, NodeId)
*/
- public EventIterator checkin(SessionInfo sessionInfo, NodeId nodeId) throws VersionException, UnsupportedRepositoryOperationException, InvalidItemStateException, LockException, RepositoryException {
+ public EventBundle[] checkin(SessionInfo sessionInfo, NodeId nodeId) throws VersionException, UnsupportedRepositoryOperationException, InvalidItemStateException, LockException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
CheckinMethod method = new CheckinMethod(uri);
@@ -972,7 +973,7 @@
/**
* @see RepositoryService#checkout(SessionInfo, NodeId)
*/
- public EventIterator checkout(SessionInfo sessionInfo, NodeId nodeId) throws UnsupportedRepositoryOperationException, LockException, RepositoryException {
+ public EventBundle[] checkout(SessionInfo sessionInfo, NodeId nodeId) throws UnsupportedRepositoryOperationException, LockException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
CheckoutMethod method = new CheckoutMethod(uri);
@@ -982,7 +983,7 @@
/**
* @see RepositoryService#restore(SessionInfo, NodeId, NodeId, boolean)
*/
- public EventIterator restore(SessionInfo sessionInfo, NodeId nodeId, NodeId versionId, boolean removeExisting) throws VersionException, PathNotFoundException, ItemExistsException, UnsupportedRepositoryOperationException, LockException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] restore(SessionInfo sessionInfo, NodeId nodeId, NodeId versionId, boolean removeExisting) throws VersionException, PathNotFoundException, ItemExistsException, UnsupportedRepositoryOperationException, LockException, InvalidItemStateException, RepositoryException {
String uri = getItemUri(nodeId, sessionInfo);
String vUri = getItemUri(versionId, sessionInfo);
@@ -992,7 +993,7 @@
/**
* @see RepositoryService#restore(SessionInfo, NodeId[], boolean)
*/
- public EventIterator restore(SessionInfo sessionInfo, NodeId[] versionIds, boolean removeExisting) throws ItemExistsException, UnsupportedRepositoryOperationException, VersionException, LockException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] restore(SessionInfo sessionInfo, NodeId[] versionIds, boolean removeExisting) throws ItemExistsException, UnsupportedRepositoryOperationException, VersionException, LockException, InvalidItemStateException, RepositoryException {
String uri = uriResolver.getWorkspaceUri(sessionInfo.getWorkspaceName());
String[] vUris = new String[versionIds.length];
for (int i = 0; i < versionIds.length; i++) {
@@ -1002,7 +1003,7 @@
return update(uri, vUris, UpdateInfo.UPDATE_BY_VERSION, removeExisting, sessionInfo);
}
- private EventIterator update(String uri, String[] updateSource, int updateType, boolean removeExisting, SessionInfo sessionInfo) throws RepositoryException {
+ private EventBundle[] update(String uri, String[] updateSource, int updateType, boolean removeExisting, SessionInfo sessionInfo) throws RepositoryException {
try {
UpdateInfo uInfo;
if (removeExisting) {
@@ -1024,7 +1025,7 @@
/**
* @see RepositoryService#merge(SessionInfo, NodeId, String, boolean)
*/
- public EventIterator merge(SessionInfo sessionInfo, NodeId nodeId, String srcWorkspaceName, boolean bestEffort) throws NoSuchWorkspaceException, AccessDeniedException, MergeException, LockException, InvalidItemStateException, RepositoryException {
+ public EventBundle[] merge(SessionInfo sessionInfo, NodeId nodeId, String srcWorkspaceName, boolean bestEffort) throws NoSuchWorkspaceException, AccessDeniedException, MergeException, LockException, InvalidItemStateException, RepositoryException {
try {
String wspHref = uriResolver.getWorkspaceUri(srcWorkspaceName);
Element mElem = MergeInfo.createMergeElement(new String[] {wspHref}, bestEffort, false, domFactory);
@@ -1043,7 +1044,7 @@
/**
* @see RepositoryService#resolveMergeConflict(SessionInfo, NodeId, NodeId[], NodeId[])
*/
- public EventIterator resolveMergeConflict(SessionInfo sessionInfo, NodeId nodeId, NodeId[] mergeFailedIds, NodeId[] predecessorIds) throws VersionException, InvalidItemStateException, UnsupportedRepositoryOperationException, RepositoryException {
+ public EventBundle[] resolveMergeConflict(SessionInfo sessionInfo, NodeId nodeId, NodeId[] mergeFailedIds, NodeId[] predecessorIds) throws VersionException, InvalidItemStateException, UnsupportedRepositoryOperationException, RepositoryException {
try {
List changeList = new ArrayList();
String[] mergeFailedHref = new String[mergeFailedIds.length];
@@ -1071,7 +1072,7 @@
/**
* @see RepositoryService#addVersionLabel(SessionInfo,NodeId,NodeId,QName,boolean)
*/
- public EventIterator addVersionLabel(SessionInfo sessionInfo, NodeId versionHistoryId, NodeId versionId, QName label, boolean moveLabel) throws VersionException, RepositoryException {
+ public EventBundle[] addVersionLabel(SessionInfo sessionInfo, NodeId versionHistoryId, NodeId versionId, QName label, boolean moveLabel) throws VersionException, RepositoryException {
try {
String uri = getItemUri(versionId, sessionInfo);
LabelMethod method = new LabelMethod(uri, NameFormat.format(label, nsResolver), (moveLabel) ? LabelInfo.TYPE_SET : LabelInfo.TYPE_ADD);
@@ -1086,7 +1087,7 @@
/**
* @see RepositoryService#removeVersionLabel(SessionInfo,NodeId,NodeId,QName)
*/
- public EventIterator removeVersionLabel(SessionInfo sessionInfo, NodeId versionHistoryId, NodeId versionId, QName label) throws VersionException, RepositoryException {
+ public EventBundle[] removeVersionLabel(SessionInfo sessionInfo, NodeId versionHistoryId, NodeId versionId, QName label) throws VersionException, RepositoryException {
try {
String uri = getItemUri(versionId, sessionInfo);
LabelMethod method = new LabelMethod(uri, NameFormat.format(label, nsResolver), LabelInfo.TYPE_REMOVE);
@@ -1115,9 +1116,7 @@
} catch (DavException e) {
throw ExceptionConverter.generate(e);
} finally {
- if (method != null) {
- method.releaseConnection();
- }
+ method.releaseConnection();
}
}
@@ -1147,78 +1146,25 @@
}
/**
- * @see RepositoryService#addEventListener(SessionInfo,NodeId,EventListener,int,boolean,String[],QName[])
+ * @see RepositoryService#getEvents(SessionInfo, long)
*/
- public void addEventListener(SessionInfo sessionInfo, NodeId nodeId, EventListener listener, int eventTypes, boolean isDeep, String[] uuids, QName[] nodeTypeIds) throws RepositoryException {
- // build event types
- List eTypes = new ArrayList();
- if ((eventTypes & Event.NODE_ADDED) == Event.NODE_ADDED) {
- eTypes.add(SubscriptionImpl.getEventType(javax.jcr.observation.Event.NODE_ADDED));
- }
- if ((eventTypes & Event.NODE_REMOVED) == Event.NODE_REMOVED) {
- eTypes.add(SubscriptionImpl.getEventType(javax.jcr.observation.Event.NODE_REMOVED));
- }
- if ((eventTypes & Event.PROPERTY_ADDED) == Event.PROPERTY_ADDED) {
- eTypes.add(SubscriptionImpl.getEventType(javax.jcr.observation.Event.PROPERTY_ADDED));
- }
- if ((eventTypes & Event.PROPERTY_REMOVED) == Event.PROPERTY_REMOVED) {
- eTypes.add(SubscriptionImpl.getEventType(javax.jcr.observation.Event.PROPERTY_REMOVED));
- }
- if ((eventTypes & Event.PROPERTY_CHANGED) == Event.PROPERTY_CHANGED) {
- eTypes.add(SubscriptionImpl.getEventType(javax.jcr.observation.Event.PROPERTY_CHANGED));
- }
- EventType[] etArr = (EventType[]) eTypes.toArray(new EventType[eTypes.size()]);
-
- // build filters from params
- List filters = new ArrayList();
- for (int i = 0; uuids != null && i < uuids.length; i++) {
- filters.add(new Filter(ObservationConstants.XML_UUID, ObservationConstants.NAMESPACE, uuids[i]));
- }
- for (int i = 0; nodeTypeIds != null && i < nodeTypeIds.length; i++) {
- try {
- String ntName = NameFormat.format(nodeTypeIds[i], nsResolver);
- filters.add(new Filter(ObservationConstants.XML_NODETYPE_NAME, ObservationConstants.NAMESPACE, ntName));
- } catch (NoPrefixDeclaredException e) {
- throw new RepositoryException(e);
- }
- }
- Filter[] ftArr = (Filter[]) filters.toArray(new Filter[filters.size()]);
-
- boolean noLocal = true;
- SubscriptionInfo subscriptionInfo = new SubscriptionInfo(etArr, ftArr, noLocal, isDeep, DavConstants.UNDEFINED_TIMEOUT);
- String uri = getItemUri(nodeId, sessionInfo);
-
+ public EventBundle[] getEvents(SessionInfo sessionInfo, long timeout)
+ throws RepositoryException, UnsupportedRepositoryOperationException {
checkSessionInfo(sessionInfo);
- SubscriptionManager sMgr = ((SessionInfoImpl)sessionInfo).getSubscriptionManager();
- if (sMgr.subscriptionExists(listener)) {
- String subscriptionId = sMgr.getSubscriptionId(listener);
- subscribe(uri, subscriptionInfo, subscriptionId, sessionInfo);
- log.debug("Subscribed on server for listener " + listener);
- } else {
- String subscriptionId = subscribe(uri, subscriptionInfo, null, sessionInfo);
- log.debug("Subscribed on server for listener " + listener);
- sMgr.addSubscription(uri, subscriptionId, listener);
- log.debug("Added subscription for listener " + listener);
+ SessionInfoImpl sessionInfoImpl = (SessionInfoImpl)sessionInfo;
+ String rootUri = uriResolver.getRootItemUri(sessionInfo.getWorkspaceName());
+ String subscriptionId = sessionInfoImpl.getSubscriptionId();
+ if (subscriptionId == null) {
+ SubscriptionInfo subscriptionInfo = new SubscriptionInfo(ALL_EVENTS, true, DavConstants.UNDEFINED_TIMEOUT);
+ subscriptionId = subscribe(rootUri, subscriptionInfo, null, sessionInfo);
+ log.debug("Subscribed on server for session info " + sessionInfo);
+ sessionInfoImpl.setSubscriptionId(subscriptionId);
}
- }
- /**
- * @see RepositoryService#removeEventListener(SessionInfo, NodeId, EventListener)
- */
- public void removeEventListener(SessionInfo sessionInfo, NodeId nodeId, EventListener listener) throws RepositoryException {
- checkSessionInfo(sessionInfo);
- SubscriptionManager sMgr = ((SessionInfoImpl)sessionInfo).getSubscriptionManager();
- String subscriptionId = sMgr.getSubscriptionId(listener);
-
- String uri = getItemUri(nodeId, sessionInfo);
- sMgr.removeSubscription(listener);
- log.debug("Removed subscription for listener " + listener);
- unsubscribe(uri, subscriptionId, sessionInfo);
- log.debug("Unsubscribed on server for listener " + listener);
+ return poll(rootUri, subscriptionId, sessionInfo);
}
-
private String subscribe(String uri, SubscriptionInfo subscriptionInfo, String subscriptionId, SessionInfo sessionInfo) throws RepositoryException {
SubscribeMethod method = null;
try {
@@ -1258,7 +1204,7 @@
}
}
- private EventIterator poll(String uri, String subscriptionId, SessionInfo sessionInfo) throws RepositoryException {
+ private EventBundle[] poll(String uri, String subscriptionId, SessionInfo sessionInfo) throws RepositoryException {
PollMethod method = null;
try {
method = new PollMethod(uri, subscriptionId);
@@ -1266,12 +1212,21 @@
method.checkSuccess();
EventDiscovery disc = method.getResponseAsEventDiscovery();
+ EventBundle[] events;
if (disc.isEmpty()) {
- return IteratorHelper.EMPTY;
+ events = new EventBundle[]{EventBundleImpl.EMPTY};
} else {
Element discEl = disc.toXml(domFactory);
- return new EventIteratorImpl(discEl, uriResolver, sessionInfo);
+ ElementIterator it = DomUtil.getChildren(discEl,
+ ObservationConstants.XML_EVENTBUNDLE,
+ ObservationConstants.NAMESPACE);
+ List bundles = new ArrayList();
+ while (it.hasNext()) {
+ bundles.add(new EventBundleImpl(it.nextElement(), uriResolver, sessionInfo));
+ }
+ events = (EventBundle[]) bundles.toArray(new EventBundle[bundles.size()]);
}
+ return events;
} catch (IOException e) {
throw new RepositoryException(e);
} catch (DavException e) {
@@ -1499,7 +1454,7 @@
}
}
- private EventIterator end(HttpClient client, boolean commit) throws RepositoryException {
+ private EventBundle[] end(HttpClient client, boolean commit) throws RepositoryException {
checkConsumed();
String uri = getItemUri(targetId, sessionInfo);
@@ -1523,7 +1478,12 @@
method.checkSuccess();
// retrieve events
- EventIterator events = poll(subscrUri, subscriptionId, sessionInfo);
+ // TODO: until we see our change!
+ EventBundle[] events = null;
+ int retries = 10;
+ while ((events == null || events.length == 0) && retries-- > 0) {
+ events = poll(subscrUri, subscriptionId, sessionInfo);
+ }
return events;
} catch (IOException e) {
throw new RepositoryException(e);
@@ -1631,9 +1591,8 @@
*/
public void addProperty(NodeId parentId, QName propertyName, InputStream value, int propertyType) throws ValueFormatException, VersionException, LockException, ConstraintViolationException, PathNotFoundException, ItemExistsException, AccessDeniedException, UnsupportedRepositoryOperationException, RepositoryException {
checkConsumed();
- QValue qV = null;
try {
- qV = QValue.create(value, propertyType);
+ QValue qV = QValue.create(value, propertyType);
Value jcrValue = ValueFormat.getJCRValue(qV, nsResolver, valueFactory);
ValuesProperty vp = new ValuesProperty(jcrValue);
internalAddProperty(parentId, propertyName, vp);
@@ -1865,110 +1824,6 @@
MoveMethod method = new MoveMethod(uri, destUri, true);
methods.add(method);
- }
- }
-
- /**
- * <code>SubscriptionManager</code>...
- */
- private class SubscriptionMgrImpl implements SubscriptionManager {
-
- private SessionInfo sessionInfo;
-
- private final Map subscriptions = new HashMap();
- private final Object subscriptionsLock = new Object();
- private Map currentSubscriptions;
-
- private Thread t;
-
- public void setSessionInfo(SessionInfo sessionInfo) {
- this.sessionInfo = sessionInfo;
- }
-
- public boolean subscriptionExists(EventListener listener) {
- return getSubscriptions().containsKey(listener);
- }
-
- public String getSubscriptionId(EventListener listener) {
- if (getSubscriptions().containsKey(listener)) {
- return ((String[]) getSubscriptions().get(listener))[1];
- } else {
- return null;
- }
- }
-
- public void addSubscription(String uri, String subscriptionId, EventListener listener) {
- synchronized (subscriptionsLock) {
- boolean doStart = subscriptions.isEmpty();
- subscriptions.put(listener, new String[] {uri,subscriptionId});
- currentSubscriptions = null;
- if (doStart) {
- startPolling();
- }
- }
- }
-
- public synchronized void removeSubscription(EventListener listener) {
- synchronized (subscriptionsLock) {
- subscriptions.remove(listener);
- currentSubscriptions = null;
- if (subscriptions.isEmpty()) {
- stopPolling();
- }
- }
- }
-
- public void dispose() {
- synchronized (subscriptionsLock) {
- if (!subscriptions.isEmpty()) {
- subscriptions.clear();
- currentSubscriptions = null;
- stopPolling();
- }
- }
- }
-
- private Map getSubscriptions() {
- synchronized (subscriptionsLock) {
- if (currentSubscriptions == null) {
- currentSubscriptions = Collections.unmodifiableMap(new HashMap(subscriptions));
- }
- return currentSubscriptions;
- }
- }
-
- private void startPolling() {
- Runnable r = new Runnable() {
- public void run() {
- while (t == Thread.currentThread()) {
- try {
- // sleep
- Thread.sleep(POLL_INTERVAL);
- // poll
- Iterator lstnIterator = getSubscriptions().keySet().iterator();
- while (lstnIterator.hasNext()) {
- EventListener listener = (EventListener) lstnIterator.next();
- String[] value = (String[]) getSubscriptions().get(listener);
- String uri = value[0];
- String subscriptionId = value[1];
- EventIterator eventIterator = poll(uri, subscriptionId, sessionInfo);
- listener.onEvent(eventIterator);
- }
- } catch (InterruptedException e) {
- log.debug("Polling thread interrupted: " + e.getMessage());
- return;
- } catch (RepositoryException e) {
- log.warn("Polling failed: ", e.getMessage());
- }
- }
- }
- };
- t = new Thread(r);
- t.start();
- }
-
- private void stopPolling() {
- t.interrupt();
}
}
}
Modified: jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/SessionInfoImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/SessionInfoImpl.java?view=diff&rev=468388&r1=468387&r2=468388
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/SessionInfoImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/SessionInfoImpl.java Fri Oct 27 07:16:13 2006
@@ -33,15 +33,20 @@
private final CredentialsWrapper credentials;
private final String workspaceName;
- private final SubscriptionManager subscrMgr;
private final Set lockTokens = new HashSet();
- SessionInfoImpl(CredentialsWrapper creds, String workspaceName, SubscriptionManager subscrMgr) {
+ private String lastEventBundleId;
+
+ /**
+ * The subscriptionId if this session info is subscribed to observation
+ * events.
+ */
+ private String subscriptionId;
+
+ SessionInfoImpl(CredentialsWrapper creds, String workspaceName) {
this.credentials = creds;
this.workspaceName = workspaceName;
- this.subscrMgr = subscrMgr;
- subscrMgr.setSessionInfo(this);
}
//--------------------------------------------------------< SessionInfo >---
@@ -80,12 +85,45 @@
lockTokens.remove(lockToken);
}
+ /**
+ * @inheritDoc
+ */
+ public String getLastEventBundleId() {
+ return lastEventBundleId;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public void setLastEventBundleId(String eventBundleId) {
+ lastEventBundleId = eventBundleId;
+ }
+
//--------------------------------------------------------------------------
+
CredentialsWrapper getCredentials() {
return credentials;
}
- SubscriptionManager getSubscriptionManager() {
- return subscrMgr;
+ /**
+ * Returns the subscriptionId for this <code>SessionInfo</code> or
+ * <code>null</code> if no subscription is present.
+ *
+ * @return the subscriptionId for this <code>SessionInfo</code>.
+ */
+ String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ /**
+ * Sets a new subscriptionId for this <code>SessionInfo</code>.
+ *
+ * @param subscriptionId the new subscriptionId.
+ * @return the old subscriptionId or <code>null</code> if there was none.
+ */
+ String setSubscriptionId(String subscriptionId) {
+ String old = this.subscriptionId;
+ this.subscriptionId = subscriptionId;
+ return old;
}
}