You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by as...@apache.org on 2013/05/18 14:17:42 UTC

svn commit: r1484099 [2/4] - in /cxf/trunk: distribution/src/main/release/samples/ distribution/src/main/release/samples/ws_eventing/ distribution/src/main/release/samples/ws_eventing/src/ distribution/src/main/release/samples/ws_eventing/src/main/ dis...

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerImpl.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerImpl.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerImpl.java Sat May 18 12:17:39 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.manager;
+
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.binding.soap.SoapFault;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.eventing.AttributedURIType;
+import org.apache.cxf.ws.eventing.DeliveryType;
+import org.apache.cxf.ws.eventing.EndpointReferenceType;
+import org.apache.cxf.ws.eventing.ExpirationType;
+import org.apache.cxf.ws.eventing.FilterType;
+import org.apache.cxf.ws.eventing.FormatType;
+import org.apache.cxf.ws.eventing.NotifyTo;
+import org.apache.cxf.ws.eventing.ReferenceParametersType;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionDatabase;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionDatabaseImpl;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.backend.notification.NotificatorService;
+import org.apache.cxf.ws.eventing.backend.notification.SubscriptionEndStatus;
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+import org.apache.cxf.ws.eventing.shared.faults.CannotProcessFilter;
+import org.apache.cxf.ws.eventing.shared.faults.DeliveryFormatRequestedUnavailable;
+import org.apache.cxf.ws.eventing.shared.faults.FilteringRequestedUnavailable;
+import org.apache.cxf.ws.eventing.shared.faults.NoDeliveryMechanismEstablished;
+import org.apache.cxf.ws.eventing.shared.faults.UnknownSubscription;
+import org.apache.cxf.ws.eventing.shared.utils.DurationAndDateUtil;
+import org.apache.cxf.ws.eventing.shared.utils.EPRInspectionTool;
+import org.apache.cxf.ws.eventing.shared.utils.FilteringUtil;
+
+/**
+ * The core class representing WS-Eventing backend. It holds an instance of a database and
+ * acts as a layer for communicating with it.
+ */
+public class SubscriptionManagerImpl implements SubscriptionManager {
+
+    protected static final Logger LOG = LogUtils.getLogger(SubscriptionManagerImpl.class);
+
+    protected final SubscriptionDatabase database;
+    private final String subscriptionIdNamespace;
+    private final String subscriptionIdElementName;
+    private String url;
+    private NotificatorService notificator;
+
+    public SubscriptionManagerImpl(String url) {
+        database = new SubscriptionDatabaseImpl();
+        this.subscriptionIdNamespace = EventingConstants.SUBSCRIPTION_ID_DEFAULT_NAMESPACE;
+        this.subscriptionIdElementName = EventingConstants.SUBSCRIPTION_ID_DEFAULT_ELEMENT_NAME;
+        this.url = url;
+    }
+
+    public  SubscriptionManagerImpl(String url, String namespace, String elementName) {
+        database = new SubscriptionDatabaseImpl();
+        this.url = url;
+        this.subscriptionIdNamespace = namespace;
+        this.subscriptionIdElementName = elementName;
+    }
+
+
+    @Override
+    public SubscriptionTicketGrantingResponse subscribe(DeliveryType delivery, EndpointReferenceType endTo,
+                                                        ExpirationType expires, FilterType filter,
+                                                        FormatType format) {
+        SubscriptionTicket ticket = new SubscriptionTicket();
+        SubscriptionTicketGrantingResponse response = new SubscriptionTicketGrantingResponse();
+        grantSubscriptionManagerReference(ticket, response);
+        processDelivery(delivery, ticket, response);
+        processEndTo(endTo, ticket, response);
+        processExpiration(expires, ticket, response);
+        processFilters(filter, ticket, response);
+        processFormat(format, ticket, response);
+        getDatabase().addTicket(ticket);
+        return response;
+    }
+
+    @Override
+    public List<SubscriptionTicket> getTickets() {
+        return Collections.unmodifiableList(database.getTickets());
+    }
+
+    protected SubscriptionDatabase getDatabase() {
+        return database;
+    }
+
+    protected void processFormat(FormatType format, SubscriptionTicket ticket,
+                                 SubscriptionTicketGrantingResponse response) {
+        if (format == null) {
+            ticket.setWrappedDelivery(false);
+            return;
+        }
+        if (format.getName().equals(EventingConstants.DELIVERY_FORMAT_WRAPPED)) {
+            LOG.info("[subscription=" + ticket.getUuid() + "] Wrapped delivery format was requested.");
+            ticket.setWrappedDelivery(true);
+        } else if (format.getName().equals(EventingConstants.DELIVERY_FORMAT_UNWRAPPED)) {
+            LOG.info("[subscription=" + ticket.getUuid() + "] Wrapped delivery format was NOT requested.");
+            ticket.setWrappedDelivery(false);
+        } else {
+            LOG.info("[subscription=" + ticket.getUuid() + "] Unknown delivery format: " + format.getName());
+            throw new DeliveryFormatRequestedUnavailable();
+        }
+    }
+
+    protected void processFilters(FilterType request, SubscriptionTicket ticket,
+                                  SubscriptionTicketGrantingResponse response) {
+        if (request != null) {
+            // test if the requested filtering dialect is supported
+            if (FilteringUtil.isFilteringDialectSupported(request.getDialect())) {
+                String filter = (String)request.getContent().get(0);
+                LOG.fine("Found filter content: " + filter);
+                if (!FilteringUtil.isValidFilter(filter)) {
+                    throw new CannotProcessFilter();
+                }
+                ticket.setFilter(request);
+            } else {
+                throw new FilteringRequestedUnavailable();
+            }
+        }
+    }
+
+    /**
+     * process the stuff concerning expiration request (wse:Expires)
+     */
+    protected void processExpiration(ExpirationType request, SubscriptionTicket ticket,
+                                     SubscriptionTicketGrantingResponse response) {
+        XMLGregorianCalendar granted;
+        if (request != null) {
+            Object expirationTypeValue;
+            try {
+                expirationTypeValue = DurationAndDateUtil.parseDurationOrTimestamp(request.getValue());
+            } catch (IllegalArgumentException ex) {
+                throw new SoapFault("Cannot parse expiration", new QName("http://cxf.apache.org/eventing", "Error"));
+            }
+            Boolean bestEffort = request.isBestEffort();
+            if (bestEffort != null && bestEffort) {
+                if (expirationTypeValue instanceof javax.xml.datatype.Duration) {
+                    granted = grantExpirationFor((javax.xml.datatype.Duration)expirationTypeValue);
+                } else if (expirationTypeValue instanceof XMLGregorianCalendar) {
+                    granted = grantExpirationFor((XMLGregorianCalendar)expirationTypeValue);
+                } else {
+                    throw new Error("expirationTypeValue of unexpected type: " + expirationTypeValue.getClass());
+                }
+            } else {
+                // client did not specify BestEffort granting, so we must either follow their wish
+                // or throw a UnsupportedExpirationValue fault
+                if (expirationTypeValue instanceof javax.xml.datatype.Duration) {
+                    try {
+                        if (DurationAndDateUtil.isPT0S((javax.xml.datatype.Duration)expirationTypeValue)) {
+                            ticket.setNonExpiring(true);
+                        }
+                        granted = DatatypeFactory.newInstance().newXMLGregorianCalendar(new GregorianCalendar());
+                        granted.add((javax.xml.datatype.Duration)expirationTypeValue);
+                    } catch (DatatypeConfigurationException e) {
+                        throw new Error(e);
+                    }
+                } else if (expirationTypeValue instanceof XMLGregorianCalendar) {
+                    granted = (XMLGregorianCalendar)expirationTypeValue;
+
+                } else {
+                    throw new Error("expirationTypeValue of unexpected type: " + expirationTypeValue.getClass());
+                }
+            }
+        } else {
+            granted = grantExpiration();
+        }
+        ticket.setExpires(granted);
+        response.setExpires(granted);
+        LOG.info("[subscription=" + ticket.getUuid() + "] Granted Expiration date: " + granted.toString());
+    }
+
+    protected void processEndTo(EndpointReferenceType request, SubscriptionTicket ticket,
+                                SubscriptionTicketGrantingResponse response) {
+        if (request != null) {
+            ticket.setEndTo(request);
+        }
+    }
+
+    protected void processDelivery(DeliveryType request, SubscriptionTicket ticket,
+                                   SubscriptionTicketGrantingResponse response) {
+        // check if there is any usable EPR in the Delivery part
+        try {
+            NotifyTo notifyTo = (NotifyTo)request.getContent().get(0);
+            if (!EPRInspectionTool.containsUsableEPR(notifyTo.getValue())) {
+                throw new NoDeliveryMechanismEstablished();
+            }
+        } catch (NullPointerException npe) {
+            throw new NoDeliveryMechanismEstablished();
+        } catch (IndexOutOfBoundsException ioobe) {
+            throw new NoDeliveryMechanismEstablished();
+        }
+        ticket.setDelivery(request);
+    }
+
+    protected void grantSubscriptionManagerReference(SubscriptionTicket ticket,
+                                                     SubscriptionTicketGrantingResponse response) {
+        EndpointReferenceType subscriptionManagerReference = new EndpointReferenceType();
+        subscriptionManagerReference.setAddress(getSubscriptionManagerAddress());
+        // generate a ID for this subscription
+        UUID uuid = UUID.randomUUID();
+        JAXBElement idqn = new JAXBElement(new QName(subscriptionIdNamespace, subscriptionIdElementName),
+                String.class,
+                uuid.toString());
+        subscriptionManagerReference.setReferenceParameters(new ReferenceParametersType());
+        subscriptionManagerReference.getReferenceParameters().getAny().add(idqn);
+        ticket.setUuid(uuid);
+        response.setSubscriptionManagerReference(subscriptionManagerReference);
+        response.setUUID(uuid);
+    }
+
+
+    /**
+     * Decide what expiration time to grant to the subscription, if
+     * the client specified a calendar time in the request and did specify BestEffort=true.
+     */
+    public XMLGregorianCalendar grantExpirationFor(XMLGregorianCalendar requested) {
+        return requested;   // default
+    }
+
+    /**
+     * Decide what expiration time to grant to the subscription, if
+     * the client specified a duration in the request and did specify BestEffort=true.
+     */
+    public XMLGregorianCalendar grantExpirationFor(javax.xml.datatype.Duration requested) {
+        XMLGregorianCalendar granted;
+        try {
+            granted = DatatypeFactory.newInstance().newXMLGregorianCalendar(new GregorianCalendar());
+            if (DurationAndDateUtil
+                    .isPT0S(requested)) { // The client requested a non-expiring subscription.
+                    // We will give them 5 years.
+                granted.add(DatatypeFactory.newInstance().newDurationYearMonth(true, 5, 0));
+            } else {
+                granted.add(requested); // default
+            }
+            return granted;
+        } catch (DatatypeConfigurationException e) {
+            throw new Error(e);
+        }
+    }
+
+    /**
+     * Decide what expiration time to grant to the subscription, if
+     * the client did not specify any particular wish for subscription length.
+     */
+    public XMLGregorianCalendar grantExpiration() {
+        try { // by default, we grant an expiration time of 2 years
+            DatatypeFactory factory = DatatypeFactory.newInstance();
+            XMLGregorianCalendar granted = factory.newXMLGregorianCalendar(new GregorianCalendar());
+            granted.add(factory.newDurationYearMonth(true, 2, 0));
+            return granted;
+        } catch (DatatypeConfigurationException ex) {
+            throw new Error(ex);
+        }
+    }
+
+
+    public AttributedURIType getSubscriptionManagerAddress() {
+        AttributedURIType ret = new AttributedURIType();
+        ret.setValue(url);
+        return ret;
+    }
+
+    @Override
+    public void unsubscribeTicket(UUID uuid) {
+        getDatabase().removeTicketByUUID(uuid);
+    }
+
+    @Override
+    public SubscriptionTicket findTicket(UUID uuid) {
+        return getDatabase().findById(uuid);
+    }
+
+    @Override
+    public ExpirationType renew(UUID uuid, ExpirationType requestedExpiration) {
+        SubscriptionTicket ticket = getDatabase().findById(uuid);
+        if (ticket == null) {
+            throw new UnknownSubscription();
+        }
+        LOG.info("[subscription=" + ticket.getUuid() + "] Requested renew expiration: "
+                + requestedExpiration.getValue());
+        LOG.fine("[subscription=" + ticket.getUuid() + "] Current expiration: " + ticket.getExpires().toXMLFormat());
+        ExpirationType response = new ExpirationType();
+        XMLGregorianCalendar grantedExpires;
+        if (DurationAndDateUtil.isDuration(requestedExpiration.getValue())) {
+            // duration was requested
+            javax.xml.datatype.Duration requestedDuration = DurationAndDateUtil
+                    .parseDuration(requestedExpiration.getValue());
+            javax.xml.datatype.Duration grantedDuration = requestedDuration;
+            LOG.info("[subscription=" + ticket.getUuid() + "] Granted renewal duration: " + grantedDuration.toString());
+            grantedExpires = getDatabase().findById(uuid)
+                    .getExpires();       // NOW() or current Expires() ????
+            grantedExpires.add(grantedDuration);
+            response.setValue(grantedDuration.toString());
+        } else {
+            // end-date was requested
+            grantedExpires = DurationAndDateUtil.parseXMLGregorianCalendar(requestedExpiration.getValue());
+            LOG.info("[subscription=" + ticket.getUuid() + "] Granted expiration: " + grantedExpires.toXMLFormat());
+            response.setValue(grantedExpires.toXMLFormat());
+        }
+        getDatabase().findById(uuid).setExpires(grantedExpires);
+        return response;
+    }
+
+    @Override
+    public void subscriptionEnd(UUID subscriptionId, String reason, SubscriptionEndStatus status) {
+        synchronized (database) {
+            SubscriptionTicket ticket = database.findById(subscriptionId);
+            if (ticket != null) {
+                database.removeTicketByUUID(subscriptionId);
+                if (ticket.getEndToURL() != null) {
+                    notificator.subscriptionEnd(ticket, reason, status);
+                }
+            } else {
+                LOG.severe("No such subscription: " + subscriptionId);
+            }
+        }
+    }
+
+    @Override
+    public void registerNotificator(NotificatorService service) {
+        this.notificator = service;
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForEventSources.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForEventSources.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForEventSources.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForEventSources.java Sat May 18 12:17:39 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.manager;
+
+import java.util.List;
+
+import org.apache.cxf.ws.eventing.DeliveryType;
+import org.apache.cxf.ws.eventing.EndpointReferenceType;
+import org.apache.cxf.ws.eventing.ExpirationType;
+import org.apache.cxf.ws.eventing.FilterType;
+import org.apache.cxf.ws.eventing.FormatType;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+
+public interface SubscriptionManagerInterfaceForEventSources {
+
+    SubscriptionTicketGrantingResponse subscribe(DeliveryType delivery, EndpointReferenceType endTo,
+                                                 ExpirationType expires, FilterType filter,
+                                                 FormatType format);
+
+    /**
+     * READ ONLY. Returns an unmodifiable list of the subscriptions in database.
+     */
+    List<SubscriptionTicket> getTickets();
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForManagers.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForManagers.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForManagers.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForManagers.java Sat May 18 12:17:39 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.manager;
+
+import java.util.UUID;
+
+import org.apache.cxf.ws.eventing.ExpirationType;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+
+public interface SubscriptionManagerInterfaceForManagers extends SubscriptionManagerInterfaceForEventSources {
+
+    void unsubscribeTicket(UUID uuid);
+
+    SubscriptionTicket findTicket(UUID uuid);
+
+    ExpirationType renew(UUID uuid, ExpirationType requestedExpiration);
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForNotificators.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForNotificators.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForNotificators.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionManagerInterfaceForNotificators.java Sat May 18 12:17:39 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.manager;
+
+
+import java.util.List;
+
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.backend.notification.NotificatorService;
+
+public interface SubscriptionManagerInterfaceForNotificators {
+
+    List<SubscriptionTicket> getTickets();
+
+    void registerNotificator(NotificatorService notificator);
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionTicketGrantingResponse.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionTicketGrantingResponse.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionTicketGrantingResponse.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/manager/SubscriptionTicketGrantingResponse.java Sat May 18 12:17:39 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.manager;
+
+import java.util.UUID;
+
+import javax.xml.datatype.XMLGregorianCalendar;
+
+import org.apache.cxf.ws.eventing.DeliveryType;
+import org.apache.cxf.ws.eventing.EndpointReferenceType;
+import org.apache.cxf.ws.eventing.FilterType;
+import org.apache.cxf.ws.eventing.shared.faults.FilteringRequestedUnavailable;
+import org.apache.cxf.ws.eventing.shared.utils.FilteringUtil;
+
+
+
+/**
+ * This is the response send from SubscriptionManager backend logic to the EventSource webservice.
+ * It contains the necessary information for the Event Source to construct a JAX-WS response
+ * for a client who sent a subscription request.
+ */
+public class SubscriptionTicketGrantingResponse {
+
+    private EndpointReferenceType endTo;
+
+    private DeliveryType delivery;
+    private XMLGregorianCalendar expires;
+    private FilterType filter;
+    private UUID uuid;
+    private EndpointReferenceType subscriptionManagerReference;
+
+    public SubscriptionTicketGrantingResponse() {
+    }
+
+    public EndpointReferenceType getEndTo() {
+        return endTo;
+    }
+
+    public void setEndTo(EndpointReferenceType endTo) {
+        this.endTo = endTo;
+    }
+
+    public DeliveryType getDelivery() {
+        return delivery;
+    }
+
+    public void setDelivery(DeliveryType delivery) {
+        this.delivery = delivery;
+    }
+
+    public FilterType getFilter() {
+        return filter;
+    }
+
+    public void setFilter(FilterType filter) {
+        if (!FilteringUtil.isFilteringDialectSupported(filter.getDialect())) {
+            throw new FilteringRequestedUnavailable();
+        }
+        this.filter = filter;
+    }
+
+    public void setUUID(UUID uuidToSet) {
+        this.uuid = uuidToSet;
+    }
+
+    public UUID getUuid() {
+        return uuid;
+    }
+
+    public EndpointReferenceType getSubscriptionManagerReference() {
+        return subscriptionManagerReference;
+    }
+
+    public void setSubscriptionManagerReference(EndpointReferenceType subscriptionManagerReference) {
+        this.subscriptionManagerReference = subscriptionManagerReference;
+    }
+
+    public XMLGregorianCalendar getExpires() {
+        return expires;
+    }
+
+    public void setExpires(XMLGregorianCalendar expires) {
+        this.expires = expires;
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificationTask.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificationTask.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificationTask.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificationTask.java Sat May 18 12:17:39 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.ws.eventing.EventType;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.shared.handlers.ReferenceParametersAddingHandler;
+
+/**
+ * Represents the task to send a notification about a particular event to a particular subscribed client.
+ * Dispatch is performed according to a provided endpoint interface.
+ */
+class EventSinkInterfaceNotificationTask implements Runnable {
+
+    protected static final Logger LOG = LogUtils.getLogger(EventSinkInterfaceNotificationTask.class);
+
+    SubscriptionTicket target;
+    Object event;
+    Class<?> endpointInterface;
+
+    public EventSinkInterfaceNotificationTask(SubscriptionTicket ticket, Object event, Class<?> endpointInterface) {
+        this.target = ticket;
+        this.event = event;
+        this.endpointInterface = endpointInterface;
+
+    }
+
+    /**
+     * Logic needed to actually send the notification to the subscribed client.
+     */
+    @Override
+    public void run() {
+        try {
+            LOG.info("Starting notification task for subscription UUID " + target.getUuid());
+
+            Method method = null;
+            final Object proxy;
+            final Object param;
+            final Class<?> eventClass = event.getClass();
+            final Class<?>[] eventClassArray = new Class<?>[] {eventClass};
+            if (target.isWrappedDelivery()) {
+                proxy = getProxy(WrappedSink.class, eventClassArray);
+                param = new EventType();
+                ((EventType)param).getContent().add(eventClass.isAnnotationPresent(XmlRootElement.class)
+                                               ? event : convertToJAXBElement(event));
+                method = WrappedSink.class.getMethod("notifyEvent", EventType.class);
+            } else {
+                proxy = getProxy(endpointInterface);
+                // find the method to use
+                Method[] methods = endpointInterface.getMethods();
+                for (int i = 0; i < methods.length && method == null; i++) {
+                    if (Arrays.equals(methods[i].getParameterTypes(), eventClassArray)) {
+                        method = methods[i];
+                    }
+                }
+                if (method == null) {
+                    LOG.severe("Couldn't find corresponding method for event of type "
+                               + eventClass.getCanonicalName() + " in event sink interface"
+                               + endpointInterface.getCanonicalName());
+                    return;
+                }
+                param = event;
+            }
+            
+            method.invoke(proxy, param);
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+    }
+    
+    @SuppressWarnings({
+        "unchecked", "rawtypes"
+    })
+    protected JAXBElement<?> convertToJAXBElement(Object evt) {
+        final Class<?> eventClass = evt.getClass();
+        String tns = endpointInterface.getAnnotation(WebService.class).targetNamespace();
+        return new JAXBElement(new QName(tns, eventClass.getName()), eventClass, evt);
+    }
+
+    protected Object getProxy(Class<?> sinkInterface, Class<?>... extraClasses) {
+        //needed SOAP handlers
+        ReferenceParametersAddingHandler handler = new
+                ReferenceParametersAddingHandler(
+                target.getNotificationReferenceParams());
+        
+        JaxWsProxyFactoryBean service = new JaxWsProxyFactoryBean();
+        service.getOutInterceptors().add(new LoggingOutInterceptor());
+        service.setServiceClass(sinkInterface);
+        service.setAddress(target.getTargetURL());
+        service.getHandlers().add(handler);
+
+        // do we need to apply a filter?
+        if (target.getFilter() != null && target.getFilter().getContent().size() > 0) {
+            service.getOutInterceptors().add(new FilteringInterceptor(target.getFilter()));
+        }
+        
+        if (extraClasses != null && extraClasses.length > 0) {
+            Map<String, Object> props = new HashMap<String, Object>();
+            props.put("jaxb.additionalContextClasses", extraClasses);
+            service.getClientFactoryBean().getServiceFactory().setProperties(props);
+        }
+
+        return service.create();
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificatorService.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificatorService.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificatorService.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/EventSinkInterfaceNotificatorService.java Sat May 18 12:17:39 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+
+/**
+ * A NotificatorService dispatching events according to a known event sink interface
+ */
+public abstract class EventSinkInterfaceNotificatorService extends NotificatorService {
+
+    protected abstract Class<?> getEventSinkInterface();
+
+    @Override
+    protected void submitNotificationTask(SubscriptionTicket ticket, Object event) {
+        service.submit(new EventSinkInterfaceNotificationTask(ticket, event, getEventSinkInterface()));
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/FilteringInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/FilteringInterceptor.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/FilteringInterceptor.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/FilteringInterceptor.java Sat May 18 12:17:39 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+import java.util.logging.Logger;
+
+import org.apache.cxf.binding.soap.SoapMessage;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.ws.eventing.FilterType;
+import org.apache.cxf.ws.eventing.shared.utils.FilteringUtil;
+
+public class FilteringInterceptor  extends AbstractPhaseInterceptor<SoapMessage> {
+
+    private static final Logger LOG = LogUtils.getLogger(FilteringInterceptor.class);
+
+    private FilterType filter;
+
+    public FilteringInterceptor(FilterType filter) {
+        super(Phase.POST_MARSHAL);
+        this.filter = filter;
+    }
+
+
+    @Override
+    public void handleMessage(SoapMessage message) throws Fault {
+        if (filter == null || filter.getContent() == null) {
+            LOG.info("No filter for this subscription");
+            return;
+        }
+        javax.xml.soap.SOAPMessage msg = message.getContent(javax.xml.soap.SOAPMessage.class);
+        if (!FilteringUtil.runFilterOnMessage(msg, filter)) {
+            message.getInterceptorChain().abort();
+        }
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/NotificatorService.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/NotificatorService.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/NotificatorService.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/NotificatorService.java Sat May 18 12:17:39 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.logging.Logger;
+
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.backend.manager.SubscriptionManagerInterfaceForNotificators;
+
+
+/**
+ * The service which takes care of notifying subscribers about events. Has access to the subscription database.
+ * Receives events from compliant Emitters, eg. EmitterServlet / EmitterMBean,..
+ * Don't forget to use the 'stop' method, especially if running inside a servlet container!!
+ * Suggested approach for a web container is to instantiate this class in a ServletContextListener
+ * and then have it stopped using the same listener. If you don't call 'stop' upon undeployment,
+ * the underlying ExecutorService will not be shut down, leaking resources.
+ */
+public abstract class NotificatorService {
+
+    public static final int CORE_POOL_SIZE = 15;
+    protected static final Logger LOG = LogUtils.getLogger(NotificatorService.class);
+    protected ExecutorService service;
+
+    public NotificatorService() {
+    }
+
+    protected abstract SubscriptionManagerInterfaceForNotificators obtainManager();
+
+    public void dispatchEvent(Object event) {
+        if (service == null) {
+            throw new IllegalStateException("NotificatorService is not started. "
+                    + "Please call the start() method before passing any events to it.");
+        }
+        for (SubscriptionTicket ticket : obtainManager().getTickets()) {
+            if (!ticket.isExpired()) {
+                submitNotificationTask(ticket, event);
+            } else {
+                LOG.info("Ticket expired at " + ticket.getExpires().toXMLFormat());
+            }
+        }
+    }
+
+    protected abstract void submitNotificationTask(SubscriptionTicket ticket, Object event);
+
+    public void subscriptionEnd(SubscriptionTicket ticket, String reason, SubscriptionEndStatus status) {
+        LOG.info("NotificatorService will notify about subscription end for ticket=" + ticket.getUuid()
+            + "; reason=" + reason);
+        service.submit(new SubscriptionEndNotificationTask(ticket, reason, status));
+    }
+
+
+    /**
+     * Starts this NotificatorService. You MUST run this method on every instance
+     * before starting to pass any events to it. Run it only once.
+     */
+    public void start() {
+        obtainManager().registerNotificator(this);
+        service = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE);
+    }
+
+    /**
+     * Shuts down the NotificatorService. This method is a MUST if you are running it inside a servlet container,
+     * because it will shutdown the underlying ExecutorService.
+     */
+    public void stop() {
+        service.shutdown();
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndNotificationTask.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndNotificationTask.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndNotificationTask.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndNotificationTask.java Sat May 18 12:17:39 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.ws.eventing.LanguageSpecificStringType;
+import org.apache.cxf.ws.eventing.SubscriptionEnd;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.client.EndToEndpoint;
+import org.apache.cxf.ws.eventing.shared.handlers.ReferenceParametersAddingHandler;
+
+public class SubscriptionEndNotificationTask implements Runnable {
+
+    private SubscriptionTicket target;
+    private String reason;
+    private SubscriptionEndStatus status;
+
+    public SubscriptionEndNotificationTask(SubscriptionTicket ticket, String reason,
+                                           SubscriptionEndStatus status) {
+        this.target = ticket;
+        this.reason = reason;
+        this.status = status;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // needed SOAP handlers
+            ReferenceParametersAddingHandler handler = new
+                    ReferenceParametersAddingHandler(
+                    target.getNotificationReferenceParams());
+            JaxWsProxyFactoryBean service = new JaxWsProxyFactoryBean();
+            service.getOutInterceptors().add(new LoggingOutInterceptor());
+            service.setServiceClass(EndToEndpoint.class);
+            service.setAddress(target.getEndToURL());
+            service.getHandlers().add(handler);
+
+            EndToEndpoint endpoint = (EndToEndpoint)service.create();
+            SubscriptionEnd message = new SubscriptionEnd();
+            message.setStatus(status.toString());
+            if (reason != null) {
+                LanguageSpecificStringType reasonElement = new LanguageSpecificStringType();
+                reasonElement.setLang("en-US");
+                reasonElement.setValue(reason);
+                message.getReason().add(reasonElement);
+            }
+            endpoint.subscriptionEnd(message);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndStatus.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndStatus.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndStatus.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/SubscriptionEndStatus.java Sat May 18 12:17:39 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public enum SubscriptionEndStatus {
+
+    DELIVERY_FAILURE(EventingConstants.SUBSCRIPTION_END_DELIVERY_FAILURE),
+    SOURCE_SHUTTING_DOWN(EventingConstants.SUBSCRIPTION_END_SHUTTING_DOWN),
+    SOURCE_CANCELLING(EventingConstants.SUBSCRIPTION_END_SOURCE_CANCELLING);
+
+    private String namespace;
+
+    private SubscriptionEndStatus(String namespace) {
+        this.namespace = namespace;
+    }
+
+    @Override
+    public String toString() {
+        return namespace;
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/WrappedSink.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/WrappedSink.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/WrappedSink.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/WrappedSink.java Sat May 18 12:17:39 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.ws.eventing.backend.notification;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+import javax.xml.ws.Action;
+import javax.xml.ws.soap.Addressing;
+
+import org.apache.cxf.ws.eventing.EventType;
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+@WebService(targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE,
+            name = EventingConstants.WRAPPED_SINK_PORT_TYPE)
+@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
+@Addressing(enabled = true, required = true)
+public interface WrappedSink {
+
+    @Oneway
+    @Action(input = EventingConstants.ACTION_NOTIFY_EVENT_WRAPPED_DELIVERY)
+    @WebMethod(operationName = EventingConstants.OPERATION_NOTIFY_EVENT)
+    void notifyEvent(
+        @WebParam(partName = EventingConstants.PARAMETER, name = EventingConstants.NOTIFY,
+                  targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE)
+        EventType parameter
+    );
+}
\ No newline at end of file

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/Emitter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/Emitter.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/Emitter.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/Emitter.java Sat May 18 12:17:39 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification.emitters;
+
+public interface Emitter {
+
+    void dispatch(Object event);
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/EmitterImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/EmitterImpl.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/EmitterImpl.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/backend/notification/emitters/EmitterImpl.java Sat May 18 12:17:39 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.backend.notification.emitters;
+
+import org.apache.cxf.ws.eventing.backend.notification.NotificatorService;
+
+public class EmitterImpl implements Emitter {
+
+    private final NotificatorService service;
+
+    public EmitterImpl(NotificatorService service) {
+        this.service = service;
+    }
+
+    @Override
+    public void dispatch(Object event) {
+        service.dispatchEvent(event);
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/client/EndToEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/client/EndToEndpoint.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/client/EndToEndpoint.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/client/EndToEndpoint.java Sat May 18 12:17:39 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.client;
+
+
+import javax.jws.WebParam;
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+import javax.xml.ws.Action;
+import javax.xml.ws.soap.Addressing;
+
+import org.apache.cxf.ws.eventing.SubscriptionEnd;
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+@WebService
+@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
+@Addressing(enabled = true, required = true)
+public interface EndToEndpoint {
+
+    @Action(
+            input = EventingConstants.ACTION_SUBSCRIPTION_END
+    )
+    void subscriptionEnd(@WebParam SubscriptionEnd subscriptionEnd);
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/AbstractEventSource.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/AbstractEventSource.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/AbstractEventSource.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/AbstractEventSource.java Sat May 18 12:17:39 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.eventsource;
+
+import java.util.logging.Logger;
+
+import javax.annotation.Resource;
+import javax.xml.ws.WebServiceContext;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.eventing.Subscribe;
+import org.apache.cxf.ws.eventing.SubscribeResponse;
+import org.apache.cxf.ws.eventing.backend.manager.SubscriptionManagerInterfaceForEventSources;
+import org.apache.cxf.ws.eventing.backend.manager.SubscriptionTicketGrantingResponse;
+import org.apache.cxf.ws.eventing.shared.utils.DurationAndDateUtil;
+
+/**
+ * Default implementation of Event Source web service.
+ */
+public abstract class AbstractEventSource implements EventSourceEndpoint {
+
+    protected static final Logger LOG = LogUtils.getLogger(AbstractEventSource.class);
+
+    @Resource
+    protected WebServiceContext context;
+
+    public AbstractEventSource() {
+    }
+
+    @Override
+    public SubscribeResponse subscribeOp(Subscribe body) {
+        SubscriptionTicketGrantingResponse databaseResponse = getSubscriptionManagerBackend()
+                .subscribe(body.getDelivery(), body.getEndTo(), body.getExpires(), body.getFilter(),
+                        body.getFormat());
+        boolean shouldConvertToDuration;
+        if (body.getExpires() != null) {
+            shouldConvertToDuration = DurationAndDateUtil.isDuration(body.getExpires().getValue());
+        } else {
+            shouldConvertToDuration = true;
+        }
+        return generateResponseMessageFor(databaseResponse, shouldConvertToDuration);
+    }
+
+    protected abstract SubscriptionManagerInterfaceForEventSources getSubscriptionManagerBackend();
+
+    protected SubscribeResponse generateResponseMessageFor(SubscriptionTicketGrantingResponse dbResponse,
+                                                           boolean shouldConvertToDuration) {
+        SubscribeResponse ret = new SubscribeResponse();
+        // SubscriptionManager part
+        ret.setSubscriptionManager(dbResponse.getSubscriptionManagerReference());
+        // Expires part
+        if (shouldConvertToDuration) {
+            ret.setGrantedExpires(
+                    DurationAndDateUtil.toExpirationTypeContainingDuration(dbResponse.getExpires()));
+        } else {
+            ret.setGrantedExpires(
+                    DurationAndDateUtil.toExpirationTypeContainingGregorianCalendar(dbResponse.getExpires()));
+        }
+        return ret;
+    }
+
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/EventSourceEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/EventSourceEndpoint.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/EventSourceEndpoint.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/eventsource/EventSourceEndpoint.java Sat May 18 12:17:39 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.eventsource;
+
+import java.io.IOException;
+
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+import javax.xml.ws.Action;
+import javax.xml.ws.soap.Addressing;
+
+import org.apache.cxf.ws.eventing.Subscribe;
+import org.apache.cxf.ws.eventing.SubscribeResponse;
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+
+/**
+ * The interface definition of an Event Source web service, according to the specification.
+ * See http://www.w3.org/TR/ws-eventing/#Subscribe
+ */
+@WebService(targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE)
+@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
+@Addressing(enabled = true, required = true)
+public interface EventSourceEndpoint {
+
+
+    /**
+     * The Subscribe operation of the Event Source.
+     * See http://www.w3.org/TR/ws-eventing/#Subscribe
+     * @param body JAXB class Subscribe representing the body of the subscription request
+     * @return JAXB class SubscribeResponse representing the response for the requester
+     */
+    @Action(
+            input = EventingConstants.ACTION_SUBSCRIBE,
+            output = EventingConstants.ACTION_SUBSCRIBE_RESPONSE
+    )
+    @WebResult(name = EventingConstants.RESPONSE_SUBSCRIBE)
+    SubscribeResponse subscribeOp(
+            @WebParam(name = EventingConstants.OPERATION_SUBSCRIBE,
+                    targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE, partName = "body")
+            Subscribe body) throws IOException;
+
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/AbstractSubscriptionManager.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/AbstractSubscriptionManager.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/AbstractSubscriptionManager.java Sat May 18 12:17:39 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.manager;
+
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import javax.annotation.Resource;
+import javax.xml.ws.WebServiceContext;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxws.context.WrappedMessageContext;
+import org.apache.cxf.ws.eventing.ExpirationType;
+import org.apache.cxf.ws.eventing.GetStatus;
+import org.apache.cxf.ws.eventing.GetStatusResponse;
+import org.apache.cxf.ws.eventing.Renew;
+import org.apache.cxf.ws.eventing.RenewResponse;
+import org.apache.cxf.ws.eventing.Unsubscribe;
+import org.apache.cxf.ws.eventing.UnsubscribeResponse;
+import org.apache.cxf.ws.eventing.backend.database.SubscriptionTicket;
+import org.apache.cxf.ws.eventing.backend.manager.SubscriptionManagerInterfaceForManagers;
+import org.apache.cxf.ws.eventing.shared.faults.UnknownSubscription;
+import org.apache.cxf.ws.eventing.shared.utils.DurationAndDateUtil;
+
+public abstract class AbstractSubscriptionManager implements SubscriptionManagerEndpoint {
+
+    protected static final Logger LOG = LogUtils.getLogger(AbstractSubscriptionManager.class);
+
+    @Resource
+    protected WebServiceContext context;
+
+    public AbstractSubscriptionManager() {
+    }
+
+
+    @Override
+    public RenewResponse renewOp(Renew body) {
+        RenewResponse response = new RenewResponse();
+        String uuid = retrieveSubscriptionUUID();
+        LOG.info("received Renew message for UUID=" + uuid);
+        ExpirationType expiration = getSubscriptionManagerBackend()
+                .renew(UUID.fromString(uuid), body.getExpires());
+        response.setGrantedExpires(expiration);
+        LOG.info("Extended subscription for UUID=" + uuid + " to " + expiration.getValue());
+        return response;
+    }
+
+    @Override
+    public GetStatusResponse getStatusOp(GetStatus body) {
+        String uuid = retrieveSubscriptionUUID();
+        LOG.info("received GetStatus message for UUID=" + uuid);
+        SubscriptionTicket ticket = obtainTicketFromDatabaseOrThrowFault(uuid);
+        GetStatusResponse response = new GetStatusResponse();
+        response.setGrantedExpires(
+                DurationAndDateUtil.toExpirationTypeContainingGregorianCalendar(ticket.getExpires()));
+        return response;
+    }
+
+    @Override
+    public UnsubscribeResponse unsubscribeOp(Unsubscribe body) {
+        String uuid = retrieveSubscriptionUUID();
+        LOG.info("received Unsubscribe message for UUID=" + uuid);
+        getSubscriptionManagerBackend().unsubscribeTicket(UUID.fromString(uuid));
+        LOG.info("successfully removed subscription with UUID " + uuid);
+        return new UnsubscribeResponse();
+    }
+
+    protected abstract SubscriptionManagerInterfaceForManagers getSubscriptionManagerBackend();
+
+    /**
+     * Retrieves the subscription's uuid as it was specified in SOAP header.
+     * Messages sent to SubscriptionManager by clients always need to specify the uuid.
+     *
+     * @return the uuid of the subscription specified in this message's headers. Note:
+     *         obtaining this doesn't yet make sure that this subscription actually exists.
+     */
+    protected String retrieveSubscriptionUUID() {
+        Object uuid = ((WrappedMessageContext)context.getMessageContext()).getWrappedMessage()
+                .getContextualProperty("uuid");
+        if (uuid == null) {
+            throw new UnknownSubscription();
+        }
+        if (uuid.getClass() != String.class) {
+            throw new Error("Subscription ID should be a String but is " + uuid.getClass().getName());
+        }
+        return (String)uuid;
+    }
+
+    /**
+     * searches the subscription database for a subscription by the given UUID
+     *
+     * @param uuid
+     * @return the SubscriptionTicket, or throws UnknownSubscription fault if no such subscription exists
+     */
+    protected SubscriptionTicket obtainTicketFromDatabaseOrThrowFault(String uuid) {
+        SubscriptionTicket ticket = getSubscriptionManagerBackend().findTicket(UUID.fromString(uuid));
+        if (ticket == null) {
+            LOG.severe("Unknown ticket UUID: " + uuid);
+            throw new UnknownSubscription();
+        }
+        return ticket;
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/SubscriptionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/SubscriptionManagerEndpoint.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/SubscriptionManagerEndpoint.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/manager/SubscriptionManagerEndpoint.java Sat May 18 12:17:39 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.manager;
+
+import javax.jws.HandlerChain;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+import javax.xml.ws.Action;
+import javax.xml.ws.soap.Addressing;
+
+import org.apache.cxf.ws.eventing.GetStatus;
+import org.apache.cxf.ws.eventing.GetStatusResponse;
+import org.apache.cxf.ws.eventing.Renew;
+import org.apache.cxf.ws.eventing.RenewResponse;
+import org.apache.cxf.ws.eventing.Unsubscribe;
+import org.apache.cxf.ws.eventing.UnsubscribeResponse;
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+/**
+ * The interface definition of a Subscription Manager web service, according to the specification.
+ */
+@WebService(targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE)
+@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
+@Addressing(enabled = true, required = true)
+@HandlerChain(file = "/subscription-reference-parsing-handler-chain.xml")
+public interface SubscriptionManagerEndpoint {
+
+    /**
+     * The Renew operation of the Subscription Manager
+     * See http://www.w3.org/TR/ws-eventing/#Renew
+     * @param body JAXB class Renew representing the body of the renew request
+     * @return JAXB class RenewResponse representing the response for the requester
+     */
+    @Action(
+            input = EventingConstants.ACTION_RENEW,
+            output = EventingConstants.ACTION_RENEW_RESPONSE
+    )
+    @WebResult(name = EventingConstants.RESPONSE_RENEW)
+    RenewResponse renewOp(
+            @WebParam(name = EventingConstants.OPERATION_RENEW,
+                    targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE, partName = "body")
+            Renew body
+    );
+
+    /**
+     * The GetStatus operation of the Subscription Manager
+     * See http://www.w3.org/TR/ws-eventing/#GetStatus
+     * @param body JAXB class GetStatus representing the body of the GetStatus request
+     * @return JAXB class GetStatusResponse representing the response for the requester
+     */
+    @Action(
+            input = EventingConstants.ACTION_GET_STATUS,
+            output = EventingConstants.ACTION_GET_STATUS_RESPONSE
+    )
+    @WebResult(name = EventingConstants.RESPONSE_GET_STATUS)
+    GetStatusResponse getStatusOp(
+            @WebParam(name = EventingConstants.OPERATION_GET_STATUS,
+                    targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE, partName = "body")
+            GetStatus body
+    );
+
+    /**
+     * The Unsubscribe operation of the Subscription Manager
+     * See http://www.w3.org/TR/ws-eventing/#Unsubscribe
+     * @param body JAXB class Unsubscribe representing the body of the Unsubscribe request
+     * @return JAXB class UnsubscribeResponse representing the response for the requester
+     */
+    @Action(
+            input = EventingConstants.ACTION_UNSUBSCRIBE,
+            output = EventingConstants.ACTION_UNSUBSCRIBE_RESPONSE
+    )
+    @WebResult(name = EventingConstants.RESPONSE_UNSUBSCRIBE)
+    UnsubscribeResponse unsubscribeOp(
+            @WebParam(name = EventingConstants.OPERATION_UNSUBSCRIBE,
+                    targetNamespace = EventingConstants.EVENTING_2011_03_NAMESPACE, partName = "body")
+            Unsubscribe body
+    );
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/EventingConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/EventingConstants.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/EventingConstants.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/EventingConstants.java Sat May 18 12:17:39 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared;
+
+/**
+ * This class contains String constants needed for WS-Eventing.
+ */
+public final class EventingConstants {
+
+    public static final String ACTION_SUBSCRIBE = "http://www.w3.org/2011/03/ws-evt/Subscribe";
+    public static final String ACTION_SUBSCRIBE_RESPONSE
+        = "http://www.w3.org/2011/03/ws-evt/SubscribeResponse";
+
+    public static final String ACTION_RENEW = "http://www.w3.org/2011/03/ws-evt/Renew";
+    public static final String ACTION_RENEW_RESPONSE = "http://www.w3.org/2011/03/ws-evt/RenewResponse";
+
+    public static final String ACTION_GET_STATUS = "http://www.w3.org/2011/03/ws-evt/GetStatus";
+    public static final String ACTION_GET_STATUS_RESPONSE
+        = "http://www.w3.org/2011/03/ws-evt/GetStatusResponse";
+
+    public static final String ACTION_UNSUBSCRIBE = "http://www.w3.org/2011/03/ws-evt/Unsubscribe";
+    public static final String ACTION_UNSUBSCRIBE_RESPONSE
+        = "http://www.w3.org/2011/03/ws-evt/UnsubscribeResponse";
+
+    public static final String ACTION_FAULT = "http://www.w3.org/2011/03/ws-evt/fault";
+
+    public static final String EVENTING_2011_03_NAMESPACE = "http://www.w3.org/2011/03/ws-evt";
+
+    public static final String RESPONSE_RENEW = "RenewResponse";
+    
+    public static final String WRAPPED_SINK_PORT_TYPE = "WrappedSinkPortType";
+
+    public static final String RESPONSE_SUBSCRIBE = "SubscribeResponse";
+    public static final String OPERATION_SUBSCRIBE = "Subscribe";
+    public static final String OPERATION_RENEW = "Renew";
+    public static final String RESPONSE_GET_STATUS = "GetStatusResponse";
+    public static final String OPERATION_GET_STATUS = "GetStatus";
+    public static final String RESPONSE_UNSUBSCRIBE = "UnsubscribeResponse";
+    public static final String OPERATION_UNSUBSCRIBE = "Unsubscribe";
+    public static final String NOTIFY = "Notify";
+    public static final String PARAMETER = "parameter";
+    public static final String OPERATION_NOTIFY_EVENT = "NotifyEvent";
+    public static final String ACTION_SUBSCRIPTION_END = "http://www.w3.org/2011/03/ws-evt/SubscriptionEnd";
+    public static final String ACTION_NOTIFY_EVENT_WRAPPED_DELIVERY
+        = "http://www.w3.org/2011/03/ws-evt/WrappedSinkPortType/NotifyEvent";
+
+    public static final String DELIVERY_FORMAT_WRAPPED
+        = "http://www.w3.org/2011/03/ws-evt/DeliveryFormats/Wrap";
+    public static final String DELIVERY_FORMAT_UNWRAPPED
+        = "http://www.w3.org/2011/03/ws-evt/DeliveryFormats/Unwrap";
+
+    public static final String SUBSCRIPTION_ID_DEFAULT_NAMESPACE = "http://cxf.apache.org/ws-eventing";
+    public static final String SUBSCRIPTION_ID_DEFAULT_ELEMENT_NAME = "SubscriptionID";
+
+    public static final String SUBSCRIPTION_END_DELIVERY_FAILURE
+        = "http://www.w3.org/2011/03/ws-evt/DeliveryFailure";
+    public static final String SUBSCRIPTION_END_SHUTTING_DOWN
+        = "http://www.w3.org/2011/03/ws-evt/SourceShuttingDown";
+    public static final String SUBSCRIPTION_END_SOURCE_CANCELLING
+        = "http://www.w3.org/2011/03/ws-evt/SourceCancelling";
+
+
+    private EventingConstants() {
+
+    }
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/CannotProcessFilter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/CannotProcessFilter.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/CannotProcessFilter.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/CannotProcessFilter.java Sat May 18 12:17:39 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared.faults;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public class CannotProcessFilter extends WSEventingFault {
+
+    public static final String REASON = "Cannot filter as requested.";
+    public static final String LOCAL_PART = "CannotProcessFilter";
+
+    public CannotProcessFilter() {
+        super(REASON,
+                null,
+                new QName(EventingConstants.EVENTING_2011_03_NAMESPACE, LOCAL_PART));
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/DeliveryFormatRequestedUnavailable.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/DeliveryFormatRequestedUnavailable.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/DeliveryFormatRequestedUnavailable.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/DeliveryFormatRequestedUnavailable.java Sat May 18 12:17:39 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared.faults;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public class DeliveryFormatRequestedUnavailable extends WSEventingFault {
+
+    public static final String REASON = "The requested delivery format is not supported.";
+    public static final String LOCAL_PART = "DeliveryFormatRequestedUnavailable";
+
+    public DeliveryFormatRequestedUnavailable() {
+        super(REASON,
+                null,
+                new QName(EventingConstants.EVENTING_2011_03_NAMESPACE, LOCAL_PART));
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EmptyFilter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EmptyFilter.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EmptyFilter.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EmptyFilter.java Sat May 18 12:17:39 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared.faults;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public class EmptyFilter extends WSEventingFault {
+
+    public static final String REASON = "The wse:Filter would result in zero notifications.";
+    public static final String LOCAL_PART = "EmptyFilter";
+
+    public EmptyFilter() {
+        super(REASON,
+                null,
+                new QName(EventingConstants.EVENTING_2011_03_NAMESPACE, LOCAL_PART));
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EndToNotSupported.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EndToNotSupported.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EndToNotSupported.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/EndToNotSupported.java Sat May 18 12:17:39 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared.faults;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public class EndToNotSupported extends WSEventingFault {
+
+    public static final String REASON = "wse:EndTo semantics is not supported.";
+    public static final String LOCAL_PART = "EndToNotSupported";
+
+    public EndToNotSupported() {
+        super(REASON,
+                null,
+                new QName(EventingConstants.EVENTING_2011_03_NAMESPACE, LOCAL_PART));
+    }
+
+}

Added: cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/FilteringRequestedUnavailable.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/FilteringRequestedUnavailable.java?rev=1484099&view=auto
==============================================================================
--- cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/FilteringRequestedUnavailable.java (added)
+++ cxf/trunk/rt/ws/eventing/src/main/java/org/apache/cxf/ws/eventing/shared/faults/FilteringRequestedUnavailable.java Sat May 18 12:17:39 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.eventing.shared.faults;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.ws.eventing.shared.EventingConstants;
+
+public class FilteringRequestedUnavailable extends WSEventingFault {
+
+    public static final String REASON = "The requested filter dialect is not supported.";
+    public static final String LOCAL_PART = "FilteringRequestedUnavailable";
+
+    public FilteringRequestedUnavailable() {
+        super(REASON,
+                null,
+                new QName(EventingConstants.EVENTING_2011_03_NAMESPACE, LOCAL_PART));
+    }
+
+}