You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wave-commits@incubator.apache.org by yu...@apache.org on 2016/05/06 12:06:49 UTC
[2/3] incubator-wave git commit: WAVE-438 - Removes XMPP federation
implementation along with relevant resources and unit tests. Fixes issue with
a test when running with Java1.8.
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java
deleted file mode 100644
index a28bd72..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.typesafe.config.Config;
-import org.apache.commons.codec.binary.Base64;
-import org.dom4j.Attribute;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationRemoteBridge;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Logger;
-
-/**
- * Remote implementation. Receives submit and history requests from the local
- * wave server and sends them to a remote wave server Host, and also receives
- * update messages from a remote wave server Host and sends them to the local
- * wave server.
- */
-public class XmppFederationRemote implements WaveletFederationProvider {
- private static final Logger LOG = Logger.getLogger(XmppFederationRemote.class.getCanonicalName());
-
- // Timeout for outstanding provider calls sent over XMPP.
- private static final int XMPP_PROVIDER_TIMEOUT = 30;
-
- private final WaveletFederationListener.Factory updatesListenerFactory;
- private final XmppDisco disco;
- private final String jid;
-
- private XmppManager manager = null;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class
- * is ready to use.
- *
- * @param updatesListenerFactory used to communicate back to the local wave
- * server when an update arrives.
- */
- @Inject
- public XmppFederationRemote(
- @FederationRemoteBridge WaveletFederationListener.Factory updatesListenerFactory,
- XmppDisco disco, Config config) {
- this.updatesListenerFactory = updatesListenerFactory;
- this.disco = disco;
- this.jid = config.getString("federation.xmpp_jid");
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Request submission of signed delta. This is part of the Federation Remote
- * interface - sends a submit request on behalf of the wave server. Part of
- * the WaveletFederationProvider interface.
- *
- * @param waveletName name of wavelet.
- * @param signedDelta delta signed by the submitting wave server.
- * @param listener callback for the result of the submit.
- */
- @Override
- public void submitRequest(final WaveletName waveletName,
- final ProtocolSignedDelta signedDelta,
- final SubmitResultListener listener) {
-
- final IQ submitIq = new IQ(IQ.Type.set);
- submitIq.setID(XmppUtil.generateUniqueId());
-
- LOG.info("Submitting delta to remote server, wavelet " + waveletName);
- submitIq.setFrom(jid);
-
- Element pubsub = submitIq.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element publish = pubsub.addElement("publish");
- publish.addAttribute("node", "wavelet");
- Element submitRequest = publish.addElement("item").addElement("submit-request",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
- Element deltaElement = submitRequest.addElement("delta");
-
- deltaElement.addCDATA(Base64Util.encode(signedDelta.toByteArray()));
- try {
- deltaElement.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(FederationErrors.badRequest(
- "Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processSubmitResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(waveletName.waveletId.getDomain(),
- new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- submitIq.setTo(remoteJid);
- manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- // TODO(thorogood): Broken, Disco should return the error (and it
- // should be timeout/etc)
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + waveletName.waveletId.getDomain() + ": " + errorMessage));
- }
- });
- }
-
- /**
- * Retrieve delta history for the given wavelet. <p/> Part of the
- * WaveletFederationProvider interface.
- *
- * @param waveletName name of wavelet.
- * @param domain the remote Federation Host
- * @param startVersion beginning of range (inclusive), minimum 0.
- * @param endVersion end of range (exclusive).
- * @param lengthLimit estimated size, in bytes, as an upper limit on the
- * amount of data returned.
- * @param listener callback for the result.
- */
- public void requestHistory(final WaveletName waveletName,
- final String domain,
- ProtocolHashedVersion startVersion,
- ProtocolHashedVersion endVersion,
- long lengthLimit,
- final WaveletFederationProvider.HistoryResponseListener listener) {
- final IQ submitIq = new IQ(IQ.Type.get);
- submitIq.setID(XmppUtil.generateUniqueId());
-
- LOG.info("Getting history from remote server, wavelet " + waveletName
- + " version " + startVersion + " (inc) through " + endVersion
- + " (ex)");
- submitIq.setFrom(jid);
-
- Element pubsub =
- submitIq.setChildElement("pubsub",
- XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- items.addAttribute("node", "wavelet");
- Element historyDelta =
- items.addElement("delta-history",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- historyDelta.addAttribute("start-version", Long.toString(startVersion
- .getVersion()));
- historyDelta.addAttribute("start-version-hash", Base64Util
- .encode(startVersion.getHistoryHash()));
- historyDelta.addAttribute("end-version", Long.toString(endVersion
- .getVersion()));
- historyDelta.addAttribute("end-version-hash", Base64Util.encode(endVersion
- .getHistoryHash()));
- if (lengthLimit > 0) {
- historyDelta.addAttribute("response-length-limit", Long
- .toString(lengthLimit));
- }
- try {
- historyDelta.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(
- FederationErrors.badRequest("Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processHistoryResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(domain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- submitIq.setTo(remoteJid);
- manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + domain + ": " + errorMessage));
- }
- });
- }
-
- @Override
- public void getDeltaSignerInfo(ByteString signerId, WaveletName waveletName,
- ProtocolHashedVersion deltaEndVersion,
- final DeltaSignerInfoResponseListener listener) {
- final IQ getSignerIq = new IQ(IQ.Type.get);
- getSignerIq.setID(XmppUtil.generateUniqueId());
-
- getSignerIq.setFrom(jid);
- // Extract domain from waveletId
- final String remoteDomain = waveletName.waveletId.getDomain();
- Element pubsub =
- getSignerIq.setChildElement("pubsub",
- XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- items.addAttribute("node", "signer");
- // TODO: should allow multiple requests in the same packet
- Element signerRequest =
- items.addElement("signer-request",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
- signerRequest.addAttribute("signer-id", Base64Util.encode(signerId));
- signerRequest.addAttribute("history-hash", Base64Util
- .encode(deltaEndVersion.getHistoryHash()));
- signerRequest.addAttribute("version", String.valueOf(deltaEndVersion
- .getVersion()));
- try {
- signerRequest.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(FederationErrors.badRequest(
- "Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processGetSignerResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(
- remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- getSignerIq.setTo(remoteJid);
- manager.send(getSignerIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + remoteDomain + ": " + errorMessage));
- }
- });
- }
-
- @Override
- public void postSignerInfo(
- final String remoteDomain,
- ProtocolSignerInfo signerInfo,
- final WaveletFederationProvider.PostSignerInfoResponseListener listener) {
- final IQ request = new IQ(IQ.Type.set);
- request.setID(XmppUtil.generateUniqueId());
-
- request.setFrom(jid);
- Element pubsub = request.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element publish = pubsub.addElement("publish");
- publish.addAttribute("node", "signer");
- XmppUtil.protocolSignerInfoToXml(signerInfo, publish.addElement("item"));
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processPostSignerResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(
- remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- request.setTo(remoteJid);
- manager.send(request, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + remoteDomain + ": " + errorMessage));
- }
- });
- }
-
- /**
- * Handles a wavelet update message from a foreign Federation Host. Passes the
- * message to the local waveserver (synchronously) and replies.
- *
- * @param updateMessage the incoming XMPP message.
- * @param responseCallback response callback for acks and errors
- */
- public void update(final Message updateMessage, final PacketCallback responseCallback) {
- final Element receiptRequested =
- updateMessage.getChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
-
- // Check existence of <event>
- Element event = updateMessage.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT);
- if (event == null) {
- responseCallback.error(FederationErrors.badRequest("Event element missing from message"));
- return;
- }
-
- // Check existence of <items> within <event>
- Element items = event.element("items");
- if (items == null) {
- responseCallback.error(FederationErrors.badRequest(
- "Items element missing from update message"));
- return;
- }
-
- // Complain if no items have been included.
- List<Element> elements = XmppUtil.toSafeElementList(items.elements("item"));
- if (elements.isEmpty()) {
- responseCallback.error(FederationErrors.badRequest("No items included"));
- return;
- }
-
- // Create a callback latch counter and corresponding countDown runnable.
- // When the latch reaches zero, send receipt (if it was requested).
- final AtomicInteger callbackCount = new AtomicInteger(1);
- final Runnable countDown = new Runnable() {
- @Override
- public void run() {
- if (callbackCount.decrementAndGet() == 0 && receiptRequested != null) {
- Message response = XmppUtil.createResponseMessage(updateMessage);
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- responseCallback.run(response);
- }
- }
- };
-
- WaveletFederationListener.WaveletUpdateCallback callback =
- new WaveletFederationListener.WaveletUpdateCallback() {
- @Override
- public void onSuccess() {
- countDown.run();
- }
-
- @Override
- public void onFailure(FederationError error) {
- // Note that we don't propogate the error, we just ack the stanza
- // and continue.
- // TODO(thorogood): We may want to rate-limit misbehaving servers
- // that are sending us invalid/malicious data.
- LOG.warning("Incoming XMPP waveletUpdate failure: " + error);
- countDown.run();
- }
- };
-
- // We must call callback once on every iteration to ensure that we send
- // response if receiptRequested != null.
- for (Element item : elements) {
- Element waveletUpdate = item.element("wavelet-update");
-
- if (waveletUpdate == null) {
- callback.onFailure(FederationErrors.badRequest(
- "wavelet-update element missing from message: " + updateMessage));
- continue;
- }
-
- final WaveletName waveletName;
- try {
- waveletName = XmppUtil.waveletNameCodec.uriToWaveletName(
- waveletUpdate.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- callback.onFailure(FederationErrors.badRequest(
- "Couldn't decode wavelet name: " + waveletUpdate.attributeValue("wavelet-name")));
- continue;
- }
-
- WaveletFederationListener listener =
- updatesListenerFactory.listenerForDomain(waveletName.waveletId.getDomain());
-
- // Submit all applied deltas to the domain-focused listener.
- ImmutableList.Builder<ByteString> builder = ImmutableList.builder();
- for (Element appliedDeltaElement :
- XmppUtil.toSafeElementList(waveletUpdate.elements("applied-delta"))) {
- builder.add(Base64Util.decode(appliedDeltaElement.getText()));
- }
- ImmutableList<ByteString> deltas = builder.build();
- if (!deltas.isEmpty()) {
- callbackCount.incrementAndGet(); // Increment required callbacks.
- listener.waveletDeltaUpdate(waveletName, deltas, callback);
- }
-
- // Optionally submit any received last committed notice.
- Element commitNoticeElement = waveletUpdate.element("commit-notice");
- if (commitNoticeElement != null) {
- ProtocolHashedVersion version = ProtocolHashedVersion.newBuilder()
- .setHistoryHash(Base64Util.decode(commitNoticeElement.attributeValue("history-hash")))
- .setVersion(Long.parseLong(commitNoticeElement.attributeValue("version"))).build();
- callbackCount.incrementAndGet(); // Increment required callbacks.
- listener.waveletCommitUpdate(waveletName, version, callback);
- }
- }
-
- // Release sentinel so that 'expected' callbacks from the WS don't invoke
- // sending a receipt.
- countDown.run();
- }
-
- /**
- * Parses the response to a submitRequest and passes the result to the correct
- * wave server.
- *
- * @param result the XMPP Packet
- * @param listener the listener to invoke with the response.
- */
- private void processSubmitResponse(Packet result, SubmitResultListener listener) {
- Element publish = null;
- Element item = null;
- Element submitResponse = null;
- Element hashedVersionElement = null;
- Element pubsub = ((IQ) result).getChildElement();
- if (pubsub != null) {
- publish = pubsub.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- submitResponse = item.element("submit-response");
- if (submitResponse != null) {
- hashedVersionElement = submitResponse.element("hashed-version");
- }
- }
- }
- }
-
- if (pubsub == null || publish == null || item == null
- || submitResponse == null || hashedVersionElement == null
- || hashedVersionElement.attribute("history-hash") == null
- || hashedVersionElement.attribute("version") == null
- || submitResponse.attribute("application-timestamp") == null
- || submitResponse.attribute("operations-applied") == null) {
- LOG.severe("Unexpected submitResponse to submit request: " + result);
- listener.onFailure(FederationErrors.badRequest("Invalid submitResponse: " + result));
- return;
- }
-
- ProtocolHashedVersion.Builder hashedVersion = ProtocolHashedVersion.newBuilder();
- hashedVersion.setHistoryHash(
- Base64Util.decode(hashedVersionElement.attributeValue("history-hash")));
- hashedVersion.setVersion(Long.parseLong(hashedVersionElement.attributeValue("version")));
- long applicationTimestamp =
- Long.parseLong(submitResponse.attributeValue("application-timestamp"));
- int operationsApplied = Integer.parseInt(submitResponse.attributeValue("operations-applied"));
- listener.onSuccess(operationsApplied, hashedVersion.build(), applicationTimestamp);
- }
-
- /**
- * Parses a response to a history request and passes the result to the wave
- * server.
- *
- * @param historyResponse the XMPP packet
- * @param listener interface to the wave server
- */
- @SuppressWarnings("unchecked")
- private void processHistoryResponse(Packet historyResponse,
- WaveletFederationProvider.HistoryResponseListener listener) {
- Element pubsubResponse = historyResponse.getElement().element("pubsub");
- Element items = pubsubResponse.element("items");
- long versionTruncatedAt = -1;
- long lastCommittedVersion = -1;
- List<ByteString> deltaList = Lists.newArrayList();
-
- if (items != null) {
- for (Element itemElement : (List<Element>) items.elements()) {
- for (Element element : (List<Element>) itemElement.elements()) {
- String elementName = element.getQName().getName();
- switch (elementName) {
- case "applied-delta":
- String deltaBody = element.getText();
- deltaList.add(ByteString.copyFrom(Base64.decodeBase64(deltaBody.getBytes())));
- break;
- case "commit-notice":
- Attribute commitVersion = element.attribute("version");
- if (commitVersion != null) {
- try {
- lastCommittedVersion = Long.parseLong(commitVersion.getValue());
- } catch (NumberFormatException e) {
- lastCommittedVersion = -1;
- }
- }
- break;
- case "history-truncated":
- Attribute truncVersion = element.attribute("version");
- if (truncVersion != null) {
- try {
- versionTruncatedAt = Long.parseLong(truncVersion.getValue());
- } catch (NumberFormatException e) {
- versionTruncatedAt = -1;
- }
- }
- break;
- default:
- listener.onFailure(FederationErrors.badRequest(
- "Bad response packet: " + historyResponse));
- break;
- }
- }
- }
- } else {
- listener.onFailure(FederationErrors.badRequest("Bad response packet: " + historyResponse));
- }
-
- final ProtocolHashedVersion lastCommitted;
- if (lastCommittedVersion > -1) {
- // TODO(thorogood): fedone doesn't send a history hash, and it's arguable
- // that it's even sane to include it.
- // Can't set it to null - NPE
- lastCommitted =
- ProtocolHashedVersion.newBuilder()
- .setVersion(lastCommittedVersion).setHistoryHash(ByteString.EMPTY)
- .build();
- } else {
- lastCommitted = null;
- }
- listener.onSuccess(deltaList, lastCommitted, versionTruncatedAt);
- }
-
- /**
- * Parses a GetSigner response, passes result to the waveserver.
- *
- * @param packet the response packet
- * @param listener the interface to the wave server
- */
- private void processGetSignerResponse(Packet packet, DeltaSignerInfoResponseListener listener) {
- IQ response = (IQ) packet;
- Element items = response.getChildElement().element("items");
- Element signature = items.element("signature");
- if (signature == null) {
- LOG.severe("Empty getDeltaSignerRequest response: " + response);
- listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response"));
- return;
- }
- String domain = signature.attributeValue("domain");
- String hashName = signature.attributeValue("algorithm");
- if (domain == null || hashName == null || signature.element("certificate") == null) {
- LOG.severe("Bad getDeltaSignerRequest response: " + response);
- listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response"));
- return;
- }
- ProtocolSignerInfo signer;
- try {
- signer = XmppUtil.xmlToProtocolSignerInfo(signature);
- } catch (UnknownSignerType e) {
- listener.onFailure(FederationErrors.badRequest(e.toString()));
- return;
- }
- listener.onSuccess(signer);
- }
-
- /**
- * Parses a response to a PostSigner request, passes result to wave server.
- *
- * @param packet the response XMPP packet
- * @param listener the listener to invoke
- */
- private void processPostSignerResponse(
- Packet packet,
- WaveletFederationProvider.PostSignerInfoResponseListener listener) {
- IQ response = (IQ) packet;
- Element pubsub = response.getChildElement();
- Element item = pubsub.element("publish").element("item");
- if (item.element("signature-response") != null) {
- listener.onSuccess();
- } else {
- listener.onFailure(FederationErrors.badRequest("No valid response"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java
deleted file mode 100644
index bb4e654..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-
-import org.waveprotocol.wave.federation.FederationTransport;
-import org.waveprotocol.wave.util.logging.Log;
-import org.xmpp.component.ComponentException;
-
-/**
- * An implementation of {@link FederationManger} for XMPP federation.
- *
- * @author tad.glines@gmail.com (Tad Glines)
- */
-public class XmppFederationTransport implements FederationTransport {
- private static final Log LOG = Log.get(XmppFederationTransport.class);
- private final ComponentPacketTransport transport;
-
- @Inject
- XmppFederationTransport(ComponentPacketTransport transport) {
- this.transport = transport;
- }
-
- @Override
- public void startFederation() {
- try {
- transport.run();
- } catch (ComponentException e) {
- LOG.warning("couldn't connect to XMPP server:", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java
deleted file mode 100644
index e0f2fb7..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.MapMaker;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-import org.xmpp.packet.PacketError;
-
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Provides abstraction between Federation-specific code and the backing XMPP
- * transport, including support for reliable outgoing calls (i.e. calls that are
- * guaranteed to time out) and sending error responses.
- *
- * TODO(thorogood): Find a better name for this class. Suggestions include
- * PacketHandler, Switchbox, TransportConnector, ReliableRouter, ...
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class XmppManager implements IncomingPacketHandler {
- private static final Logger LOG = Logger.getLogger(XmppManager.class.getCanonicalName());
-
- /**
- * Inner static class representing a single outgoing call.
- */
- private static class OutgoingCall {
- final Class<? extends Packet> responseType;
- PacketCallback callback;
- ScheduledFuture<?> timeout;
-
- OutgoingCall(Class<? extends Packet> responseType, PacketCallback callback) {
- this.responseType = responseType;
- this.callback = callback;
- }
-
- void start(ScheduledFuture<?> timeout) {
- Preconditions.checkState(this.timeout == null);
- this.timeout = timeout;
- }
- }
-
- /**
- * Inner non-static class representing a single incoming call. These are not
- * cancellable and do not time out; this is just a helper class so success and
- * failure responses may be more cleanly invoked.
- */
- private class IncomingCallback implements PacketCallback {
- private final Packet request;
- private boolean complete = false;
-
- IncomingCallback(Packet request) {
- this.request = request;
- }
-
- @Override
- public void error(FederationError error) {
- Preconditions.checkState(!complete,
- "Must not callback multiple times for incoming packet: %s", request);
- complete = true;
- sendErrorResponse(request, error);
- }
-
- @Override
- public void run(Packet response) {
- Preconditions.checkState(!complete,
- "Must not callback multiple times for incoming packet: %s", request);
- // TODO(thorogood): Check outgoing response versus stored incoming request
- // to ensure that to/from are paired correctly?
- complete = true;
- transport.sendPacket(response);
- }
- }
-
- // Injected types that handle incoming XMPP packet types.
- private final XmppFederationHost host;
- private final XmppFederationRemote remote;
- private final XmppDisco disco;
- private final OutgoingPacketTransport transport;
- private final String jid;
-
- // Pending callbacks to outgoing requests.
- private final ConcurrentMap<String, OutgoingCall> callbacks = new MapMaker().makeMap();
- private final ScheduledExecutorService timeoutExecutor =
- Executors.newSingleThreadScheduledExecutor();
-
- @Inject
- public XmppManager(XmppFederationHost host, XmppFederationRemote remote, XmppDisco disco,
- OutgoingPacketTransport transport, Config config) {
- this.host = host;
- this.remote = remote;
- this.disco = disco;
- this.transport = transport;
- this.jid = config.getString("federation.xmpp_jid");
-
- // Configure all related objects with this manager. Eventually, this should
- // be replaced by better Guice interface bindings.
- host.setManager(this);
- remote.setManager(this);
- disco.setManager(this);
- }
-
- @Override
- public void receivePacket(final Packet packet) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Received incoming XMPP packet:\n" + packet);
- }
-
- if (packet instanceof IQ) {
- IQ iq = (IQ) packet;
- if (iq.getType().equals(IQ.Type.result) || iq.getType().equals(IQ.Type.error)) {
- // Result type, hand off to callback handler.
- response(packet);
- } else {
- processIqGetSet(iq);
- }
- } else if (packet instanceof Message) {
- Message message = (Message) packet;
- if (message.getType().equals(Message.Type.error)
- || message.getChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS) != null) {
- // Response type, hand off to callback handler.
- response(packet);
- } else {
- processMessage(message);
- }
- } else {
- sendErrorResponse(packet, FederationError.Code.BAD_REQUEST, "Unhandled packet type: "
- + packet.getElement().getQName().getName());
- }
- }
-
- /**
- * Populate the given request subclass of Packet and return it.
- */
- private <V extends Packet> V createRequest(V packet, String toJid) {
- packet.setTo(toJid);
- packet.setID(XmppUtil.generateUniqueId());
- packet.setFrom(jid);
- return packet;
- }
-
- /**
- * Create a request IQ stanza with the given toJid.
- *
- * @param toJid target JID
- * @return new IQ stanza
- */
- public IQ createRequestIQ(String toJid) {
- return createRequest(new IQ(), toJid);
- }
-
- /**
- * Create a request Message stanza with the given toJid.
- *
- * @param toJid target JID
- * @return new Message stanza
- */
- public Message createRequestMessage(String toJid) {
- return createRequest(new Message(), toJid);
- }
-
- /**
- * Sends the given XMPP packet over the backing transport. This accepts a
- * callback which is guaranteed to be invoked at a later point, either through
- * a normal response, error response, or timeout.
- *
- * @param packet packet to be sent
- * @param callback callback to be invoked on response or timeout
- * @param timeout timeout, in seconds, for this callback
- */
- public void send(Packet packet, final PacketCallback callback, int timeout) {
- final String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
-
- final OutgoingCall call = new OutgoingCall(packet.getClass(), callback);
- if (callbacks.putIfAbsent(key, call) == null) {
- // Timeout runnable to be invoked on packet expiry.
- Runnable timeoutTask = new Runnable() {
- @Override
- public void run() {
- if (callbacks.remove(key, call)) {
- callback.error(
- FederationErrors.newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT));
- } else {
- // Likely race condition where success has actually occurred. Ignore.
- }
- }
- };
- call.start(timeoutExecutor.schedule(timeoutTask, timeout, TimeUnit.SECONDS));
- transport.sendPacket(packet);
- } else {
- String msg = "Could not send packet, ID already in-flight: " + key;
- LOG.warning(msg);
-
- // Invoke the callback with an internal error.
- callback.error(
- FederationErrors.newFederationError(FederationError.Code.UNDEFINED_CONDITION, msg));
- }
- }
-
- /**
- * Cause an immediate timeout for the given packet, which is presumed to have
- * already been sent via {@link #send}.
- */
- @VisibleForTesting
- void causeImmediateTimeout(Packet packet) {
- String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
- OutgoingCall call = callbacks.remove(key);
- if (call != null) {
- call.callback.error(FederationErrors.newFederationError(
- FederationError.Code.REMOTE_SERVER_TIMEOUT, "Forced immediate timeout"));
- }
- }
-
- /**
- * Invoke the callback for a packet already identified as a response. This may
- * either invoke the error or normal callback as necessary.
- */
- private void response(Packet packet) {
- String key = packet.getID() + "#" + packet.getFrom() + "#" + packet.getTo();
- OutgoingCall call = callbacks.remove(key);
-
- if (call == null) {
- LOG.warning("Received response packet without paired request: " + packet.getID());
- } else {
- // Cancel the outstanding timeout.
- call.timeout.cancel(false);
-
- // Look for error condition and invoke the relevant callback.
- Element element = packet.getElement().element("error");
- if (element != null) {
- LOG.fine("Invoking error callback for: " + packet.getID());
- call.callback.error(toFederationError(new PacketError(element)));
- } else {
- if (call.responseType.equals(packet.getClass())) {
- LOG.fine("Invoking normal callback for: " + packet.getID());
- call.callback.run(packet);
- } else {
- String msg =
- "Received mismatched response packet type: expected " + call.responseType
- + ", given " + packet.getClass();
- LOG.warning(msg);
- call.callback.error(FederationErrors.newFederationError(
- FederationError.Code.UNDEFINED_CONDITION, msg));
- }
- }
-
- // Clear call's reference to callback, otherwise callback only
- // becomes eligible for GC once the timeout expires, because
- // timeoutExecutor holds on to the call object till then, even
- // though we cancelled the timeout.
- call.callback = null;
- }
- }
-
- /**
- * Process IQ request stanzas. This encompasses XMPP disco, submit and history
- * requests/responses, and get/post signer info requests/responses.
- */
- private void processIqGetSet(IQ iq) {
- Element body = iq.getChildElement();
- if (body == null) {
- sendErrorResponse(iq, FederationErrors.badRequest("Malformed request, no IQ child"));
- return;
- }
-
- final String namespace = body.getQName().getNamespace().getURI();
- final boolean isIQSet;
- if (iq.getType().equals(IQ.Type.get)) {
- isIQSet = false;
- } else if (iq.getType().equals(IQ.Type.set)) {
- isIQSet = true;
- } else {
- throw new IllegalArgumentException("Can only process an IQ get/set.");
- }
- PacketCallback responseCallback = new IncomingCallback(iq);
-
- if (namespace.equals(XmppNamespace.NAMESPACE_PUBSUB)) {
- final Element pubsub = iq.getChildElement();
- final Element element = pubsub.element(isIQSet ? "publish" : "items");
-
- if (element.attributeValue("node").equals("wavelet")) {
- if (isIQSet) {
- host.processSubmitRequest(iq, responseCallback);
- } else {
- host.processHistoryRequest(iq, responseCallback);
- }
- } else if (element.attributeValue("node").equals("signer")) {
- if (isIQSet) {
- host.processPostSignerRequest(iq, responseCallback);
- } else {
- host.processGetSignerRequest(iq, responseCallback);
- }
- } else {
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled pubsub request");
- }
- } else if (!isIQSet) {
- switch (namespace) {
- case XmppNamespace.NAMESPACE_DISCO_INFO:
- disco.processDiscoInfoGet(iq, responseCallback);
- break;
- case XmppNamespace.NAMESPACE_DISCO_ITEMS:
- disco.processDiscoItemsGet(iq, responseCallback);
- break;
- default:
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ get");
- break;
- }
- } else {
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ set");
- }
- }
-
- /**
- * Processes Message stanzas. This encompasses wavelet updates, update acks,
- * and ping messages.
- */
- private void processMessage(Message message) {
- if (message.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT) != null) {
- remote.update(message, new IncomingCallback(message));
- } else if (message.getChildElement("ping", XmppNamespace.NAMESPACE_WAVE_SERVER) != null) {
- // Respond inline to the ping.
- LOG.info("Responding to ping from: " + message.getFrom());
- Message response = XmppUtil.createResponseMessage(message);
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- transport.sendPacket(response);
- } else {
- sendErrorResponse(message, FederationError.Code.BAD_REQUEST, "Unhandled message type");
- }
- }
-
- /**
- * Helper method to send generic error responses, backed onto
- * {@link #sendErrorResponse(Packet, FederationError)}.
- */
- void sendErrorResponse(Packet request, FederationError.Code code) {
- sendErrorResponse(request, FederationErrors.newFederationError(code));
- }
-
- /**
- * Helper method to send error responses, backed onto
- * {@link #sendErrorResponse(Packet, FederationError)}.
- */
- void sendErrorResponse(Packet request, FederationError.Code code, String text) {
- sendErrorResponse(request, FederationErrors.newFederationError(code, text));
- }
-
- /**
- * Send an error request to the passed incoming request.
- *
- * @param request packet request, target is derived from its to/from
- * @param error error to be contained in response
- */
- void sendErrorResponse(Packet request, FederationError error) {
- if (error.getErrorCode() == FederationError.Code.OK) {
- throw new IllegalArgumentException("Can't send an error of OK!");
- }
- sendErrorResponse(request, toPacketError(error));
- }
-
- /**
- * Send an error response to the passed incoming request. Throws
- * IllegalArgumentException if the original packet is also an error, or is of
- * the IQ result type.
- *
- * According to RFC 3920 (9.3.1), the error packet may contain the original
- * packet. However, this implementation does not include it.
- *
- * @param request packet request, to/from is inverted for response
- * @param error packet error describing error condition
- */
- void sendErrorResponse(Packet request, PacketError error) {
- if (request instanceof IQ) {
- IQ.Type type = ((IQ) request).getType();
- if (!(type.equals(IQ.Type.get) || type.equals(IQ.Type.set))) {
- throw new IllegalArgumentException("May only return an error to IQ get/set, not: " + type);
- }
- } else if (request instanceof Message) {
- Message message = (Message) request;
- if (message.getType().equals(Message.Type.error)) {
- throw new IllegalArgumentException("Can't return an error to another message error");
- }
- } else {
- throw new IllegalArgumentException("Unexpected Packet subclass, expected Message/IQ: "
- + request.getClass());
- }
-
- LOG.fine("Sending error condition in response to " + request.getID() + ": "
- + error.getCondition().name());
-
- // Note that this does not include the original packet; just the ID.
- final Packet response = XmppUtil.createResponsePacket(request);
- response.setError(error);
-
- transport.sendPacket(response);
- }
-
- /**
- * Convert a FederationError instance to a PacketError. This may return
- * <undefined-condition> if the incoming error can't be understood.
- *
- * @param error the incoming error
- * @return a generated PacketError instance
- * @throws IllegalArgumentException if the OK error code is given
- */
- private static PacketError toPacketError(FederationError error) {
- Preconditions.checkArgument(error.getErrorCode() != FederationError.Code.OK);
-
- String tag = error.getErrorCode().name().toLowerCase().replace('_', '-');
- PacketError.Condition condition;
- try {
- condition = PacketError.Condition.fromXMPP(tag);
- } catch (IllegalArgumentException e) {
- condition = PacketError.Condition.undefined_condition;
- LOG.warning("Did not understand error condition, defaulting to: " + condition.name());
- }
- PacketError result = new PacketError(condition);
- if (error.hasErrorMessage()) {
- // TODO(thorogood): Hide this behind a flag so we don't always broadcast error cases.
- result.setText(error.getErrorMessage(), "en");
- }
- return result;
- }
-
- /**
- * Convert a PacketError instance to an internal FederationError. This may
- * return an error code of UNDEFINED_CONDITION if the incoming error can't be
- * understood.
- *
- * @param error the incoming PacketError
- * @return the generated FederationError instance
- */
- private static FederationError toFederationError(PacketError error) {
- String tag = error.getCondition().name().toUpperCase().replace('-', '_');
- FederationError.Code code;
- try {
- code = FederationError.Code.valueOf(tag);
- } catch (IllegalArgumentException e) {
- code = FederationError.Code.UNDEFINED_CONDITION;
- }
- FederationError.Builder builder = FederationError.newBuilder().setErrorCode(code);
- if (error.getText() != null) {
- builder.setErrorMessage(error.getText());
- }
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java
deleted file mode 100644
index 7656a3b..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-/**
- * Namespace definitions for the XMPP package.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-final class XmppNamespace {
-
- // Namespace definitions for packet types
- static final String NAMESPACE_XMPP_RECEIPTS = "urn:xmpp:receipts";
- static final String NAMESPACE_DISCO_INFO = "http://jabber.org/protocol/disco#info";
- static final String NAMESPACE_DISCO_ITEMS = "http://jabber.org/protocol/disco#items";
- static final String NAMESPACE_PUBSUB = "http://jabber.org/protocol/pubsub";
- static final String NAMESPACE_PUBSUB_EVENT = "http://jabber.org/protocol/pubsub#event";
- static final String NAMESPACE_WAVE_SERVER = "http://waveprotocol.org/protocol/0.2/waveserver";
-
- /**
- * Uninstantiable class.
- */
- private XmppNamespace() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java
deleted file mode 100644
index 7ca2971..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.model.id.IdURIEncoderDecoder;
-import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Common utility code for XMPP packet generation and parsing.
- */
-public class XmppUtil {
- private static final AtomicLong idSequenceNo = new AtomicLong(0);
- private static final Random random = new SecureRandom();
-
- public static final IdURIEncoderDecoder waveletNameCodec =
- new IdURIEncoderDecoder(new JavaUrlCodec());
-
- // If non-null, this fake unique ID will be returned from generateUniqueId()
- // rather than a random base64 string.
- @VisibleForTesting
- public static String fakeUniqueId = null;
-
- // Alternately, and better, this callable will be called each time an ID is needed, if non-null.
- @VisibleForTesting
- static Callable<String> fakeIdGenerator = null;
-
- private XmppUtil() {
- }
-
- /**
- * Helper method to translate from the XMPP package (1.4 without generics) to
- * type-safe element lists.
- */
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- public static List<Element> toSafeElementList(List elements) {
- return (List<Element>) elements;
- }
-
- /**
- * Checked exception thrown by signer conversion code.
- */
- public static class UnknownSignerType extends Exception {
- public UnknownSignerType(String algorithm) {
- super(algorithm);
- }
-
- public UnknownSignerType(String algorithm, Throwable stacked) {
- super(algorithm, stacked);
- }
- }
-
- /**
- * Convert the signer information to XML and place the result within the
- * passed Element. This method should never fail.
- */
- public static void protocolSignerInfoToXml(ProtocolSignerInfo signerInfo, Element parent) {
- Element signature = parent.addElement("signature", XmppNamespace.NAMESPACE_WAVE_SERVER);
- signature.addAttribute("domain", signerInfo.getDomain());
- ProtocolSignerInfo.HashAlgorithm hashValue = signerInfo.getHashAlgorithm();
-
- signature.addAttribute("algorithm", hashValue.name());
- for (ByteString cert : signerInfo.getCertificateList()) {
- signature.addElement("certificate").addCDATA(Base64Util.encode(cert));
- }
- }
-
- /**
- * Convert the given Element to a signer information XML element.
- *
- * @throws UnknownSignerType when the given hash algorithm is not understood
- */
- public static ProtocolSignerInfo xmlToProtocolSignerInfo(Element signature)
- throws UnknownSignerType {
- ProtocolSignerInfo.HashAlgorithm hash;
- String algorithm = signature.attributeValue("algorithm").toUpperCase();
- try {
- hash = ProtocolSignerInfo.HashAlgorithm.valueOf(algorithm);
- } catch (IllegalArgumentException e) {
- throw new UnknownSignerType(algorithm, e);
- }
-
- ProtocolSignerInfo.Builder builder = ProtocolSignerInfo.newBuilder();
- builder.setHashAlgorithm(hash);
- builder.setDomain(signature.attributeValue("domain"));
- for (Element certElement : toSafeElementList(signature.elements("certificate"))) {
- builder.addCertificate(Base64Util.decode(certElement.getText()));
- }
- return builder.build();
- }
-
- /**
- * Convenience method to create a response {@link Message} instance based on
- * the passed request. Simply returns a new message instance with the same ID,
- * but with inverse to/from addresses.
- *
- * @param request the request message
- * @return the new response message
- */
- public static Message createResponseMessage(Message request) {
- Message response = new Message();
- response.setID(request.getID());
- response.setTo(request.getFrom());
- response.setFrom(request.getTo());
- return response;
- }
-
- /**
- * Convenience method to create a response {@link Packet} implementation from
- * the given source packet. This will return either an {@link IQ} or
- * {@link Message} depending on the passed type.
- *
- * @param request the request message
- * @return the new response message
- */
- public static Packet createResponsePacket(Packet request) {
- if (request instanceof Message) {
- return createResponseMessage((Message) request);
- } else if (request instanceof IQ) {
- return IQ.createResultIQ((IQ) request);
- } else {
- throw new IllegalArgumentException("Can't respond to unsupported packet type: "
- + request.getClass());
- }
- }
-
- /**
- * Generate a unique string identifier for use in stanzas.
- *
- * @return unique string identifier
- */
- public static String generateUniqueId() {
- if (fakeIdGenerator != null) {
- try {
- return fakeIdGenerator.call();
- } catch (Exception e) {
- // This is used in tests only.
- throw new RuntimeException(e);
- }
- }
- // TODO(arb): deprecate this.
- if (fakeUniqueId != null) {
- return fakeUniqueId;
- }
-
- // Generate a base64 ID based on raw bytes.
- byte[] bytes = ByteBuffer.allocate(16)
- .putLong(random.nextLong()).putLong(idSequenceNo.incrementAndGet()).array();
- return Base64Util.encode(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
index c1bd5f0..6868ee3 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.AccountStore;
import org.waveprotocol.box.server.persistence.AccountStoreTestBase;
@@ -42,7 +43,7 @@ public class AccountStoreTest extends AccountStoreTestBase {
@Override
protected AccountStore newAccountStore() {
return new FileAccountStore(
- ConfigFactory.parseString("core.account_store_directory : " + path.getAbsolutePath()));
+ ConfigFactory.parseMap (ImmutableMap.of("core.account_store_directory", path.getAbsolutePath())));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
index 6ace3ee..53a68d3 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.AttachmentStore;
import org.waveprotocol.box.server.persistence.AttachmentStoreTestBase;
@@ -41,7 +42,7 @@ public class AttachmentStoreTest extends AttachmentStoreTestBase {
@Override
protected AttachmentStore newAttachmentStore() {
return new FileAttachmentStore(
- ConfigFactory.parseString("core.attachment_store_directory : " + path.getAbsolutePath()));
+ ConfigFactory.parseMap (ImmutableMap.of("core.attachment_store_directory", path.getAbsolutePath())));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
index 0065d80..c10a774 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.CertPathStoreTestBase;
import org.waveprotocol.wave.crypto.CertPathStore;
@@ -46,7 +47,8 @@ public class CertPathStoreTest extends CertPathStoreTestBase {
@Override
protected CertPathStore newCertPathStore() {
- return new FileSignerInfoStore(ConfigFactory.parseString("core.signer_info_store_directory : " + path.getAbsolutePath()));
+ return new FileSignerInfoStore(ConfigFactory.parseMap (
+ ImmutableMap.of("core.signer_info_store_directory", path.getAbsolutePath())));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
index 1bc18ee..3dbfaef 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
@@ -21,6 +21,7 @@ package org.waveprotocol.box.server.persistence.file;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.DeltaStoreTestBase;
import org.waveprotocol.box.server.waveserver.DeltaStore;
@@ -55,8 +56,8 @@ public class DeltaStoreTest extends DeltaStoreTestBase {
@Override
protected DeltaStore newDeltaStore() {
- return new FileDeltaStore(ConfigFactory.parseString("core.delta_store_directory : " + path
- .getAbsolutePath()));
+ return new FileDeltaStore(ConfigFactory.parseMap (
+ ImmutableMap.of("core.delta_store_directory", path.getAbsolutePath())));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java
deleted file mode 100644
index 5824cc8..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.base.Function;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Tiny MockDisco class that wraps XmppDisco.
- *
- * Use {@link #testInjectInDomainToJidMap} to configure custom immediate responses, otherwise
- * responses will be placed on a pending queue.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class MockDisco extends XmppDisco {
-
- private static final int FAIL_EXPIRY_SECS = 5 * 60;
- private static final int SUCCESS_EXPIRY_SECS = 2 * 60 * 60;
- private static final int DISCO_EXPIRY_HOURS = 6;
-
- public static final Config config;
-
- static {
- Map<String, Object> props = new HashMap<>();
- props.put("federation.xmpp_server_description", "Wave in a Box");
- props.put("federation.disco_info_category", "collaboration");
- props.put("federation.disco_info_type", "apache-wave");
- props.put("federation.xmpp_disco_failed_expiry", FAIL_EXPIRY_SECS + "s");
- props.put("federation.xmpp_disco_successful_expiry", SUCCESS_EXPIRY_SECS + "s");
- props.put("federation.disco_expiration", DISCO_EXPIRY_HOURS + "h");
-
- config = ConfigFactory.parseMap(props);
- }
-
- MockDisco() {
- super(config);
- }
-
- public static class PendingMockDisco {
- public final String remoteDomain;
- public final Queue<SuccessFailCallback<String, String>> callbacks = Lists.newLinkedList();
-
- private PendingMockDisco(String remoteDomain) {
- this.remoteDomain = remoteDomain;
- }
-
- private void addCallback(SuccessFailCallback<String, String> callback) {
- callbacks.add(callback);
- }
- }
-
- public LoadingCache<String, PendingMockDisco> pending = CacheBuilder.newBuilder()
- .build(new CacheLoader<String, PendingMockDisco>() {
- @Override
- public PendingMockDisco load(String domain) {
- return new PendingMockDisco(domain);
- }
- });
-
- @Override
- public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) {
- if (isDiscoRequestAvailable(remoteDomain)) {
- // Note: tiny race condition in case this is purged between above and
- // below, but since this is only used in tests, we can probably ignore it.
- super.discoverRemoteJid(remoteDomain, callback);
- } else {
- try {
- pending.get(remoteDomain).addCallback(callback);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java
deleted file mode 100644
index 588fbe8..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * Dummy implementation of {@link OutgoingPacketTransport} that stores packets
- * being sent over-the-wire. May optionally accept a {@link Router} instance
- * where packets are automatically forwarded.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class MockOutgoingPacketTransport implements OutgoingPacketTransport {
-
- public interface Router {
- public void route(Packet packet);
- }
-
- // wrapped router object, if null then packets are not routed
- public Router router;
-
- // pending outgoing packets
- public final Queue<Packet> packets = new LinkedList<Packet>();
-
- // last packet sent
- public Packet lastPacketSent = null;
-
- // total number of packets sent here
- public long packetsSent = 0;
-
- public MockOutgoingPacketTransport() {
- router = null;
- }
-
- public MockOutgoingPacketTransport(Router router) {
- this.router = router;
- }
-
- @Override
- public void sendPacket(Packet packet) {
- if (!packets.offer(packet)) {
- throw new IllegalStateException("Can't offer packet to queue: " + packets);
- }
- lastPacketSent = packet;
- packetsSent++;
-
- if (router != null) {
- router.route(packet);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java
deleted file mode 100644
index 2668a74..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import org.xmpp.packet.IQ;
-import org.joda.time.DateTimeUtils;
-import org.waveprotocol.wave.federation.FederationErrors;
-
-
-import junit.framework.TestCase;
-
-/**
- * Performs naive tests over RemoteDisco. Integration testing is performed in
- * {@link XmppDiscoTest}.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class RemoteDiscoTest extends TestCase {
-
- private final static String REMOTE_DOMAIN = "acmewave.com";
- private final static String REMOTE_JID = "wave.acmewave.com";
- private static final int SUCCESS_EXPIRY_SECS = 600;
- private static final int FAIL_EXPIRY_SECS = 120;
- private RemoteDisco remoteDisco;
- private SuccessFailCallback<String, String> callback;
-
- protected void setUp() throws Exception {
- super.setUp();
- XmppManager manager = mock(XmppManager.class);
- when(manager.createRequestIQ(eq(REMOTE_DOMAIN))).thenReturn(new IQ());
-
- DateTimeUtils.setCurrentMillisFixed(0);
- remoteDisco = new RemoteDisco(manager, REMOTE_DOMAIN, FAIL_EXPIRY_SECS,
- SUCCESS_EXPIRY_SECS);
- callback = mockDiscoCallback();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- DateTimeUtils.setCurrentMillisSystem();
- }
-
- @SuppressWarnings("unchecked")
- private SuccessFailCallback<String, String> mockDiscoCallback() {
- return mock(SuccessFailCallback.class);
- }
-
- /**
- * Test a RemoteDisco created with a forced success case.
- */
- public void testForcedSuccess() {
- RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, REMOTE_JID, null);
-
- SuccessFailCallback<String, String> callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback).onSuccess(eq(REMOTE_JID));
- verify(callback, never()).onFailure(anyString());
- }
-
- /**
- * Test a RemoteDisco created with a forced failure case.
- */
- public void testForcedFailure() {
- RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, null,
- FederationErrors.badRequest("irrelevant"));
-
- SuccessFailCallback<String, String> callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback, never()).onSuccess(anyString());
- verify(callback).onFailure(anyString());
- callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback, never()).onSuccess(anyString());
- verify(callback).onFailure(anyString());
- }
-
- /**
- * Tests the disco expiry code for successful disco results.
- */
- public void testTimeToLive() {
- remoteDisco.discoverRemoteJID(callback);
- assertFalse(remoteDisco.ttlExceeded());
- remoteDisco.finish(REMOTE_JID, null); // successful disco
- assertFalse(remoteDisco.ttlExceeded());
- tick((SUCCESS_EXPIRY_SECS - 1) * 1000); // not quite expired
- assertFalse(remoteDisco.ttlExceeded());
- tick(20 * 1000); // should now be expired
- assertTrue(remoteDisco.ttlExceeded());
- }
-
- /**
- * Tests the disco expiry code for failed disco results.
- */
- public void testTimeToLiveDiscoFailed() {
- remoteDisco.discoverRemoteJID(callback);
- assertFalse(remoteDisco.ttlExceeded());
- remoteDisco.finish(null, FederationErrors.badRequest("test failure")); // failed disco
- assertFalse(remoteDisco.ttlExceeded());
- tick((FAIL_EXPIRY_SECS - 1) * 1000); // not quite expired
- assertFalse(remoteDisco.ttlExceeded());
- tick(20 * 1000); // should now be expired
- assertTrue(remoteDisco.ttlExceeded());
- }
-
- /**
- * Advance the clock.
- *
- * @param millis milliseconds to advance clock
- */
- private void tick(int millis) {
- DateTimeUtils.setCurrentMillisFixed(DateTimeUtils.currentTimeMillis() + millis);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java
----------------------------------------------------------------------
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java
deleted file mode 100644
index fc928a6..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
-
-import com.typesafe.config.ConfigFactory;
-import junit.framework.TestCase;
-
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-import org.xmpp.packet.PacketError;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.xmpp.MockOutgoingPacketTransport.Router;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test round-trips between two XmppManager instances pointed at each other.
- *
- * This class is not intended to test specific calls; it is primary to test
- * reliable calls made by the manager along with error handling. Any specific
- * call coverage is purely a side-effect of wanting real test data.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class RoundTripTest extends TestCase {
-
- private static final String SERVER1_DOMAIN = "google.com";
- private static final String SERVER2_DOMAIN = "acmewave.com";
-
- private static final int PACKET_TIMEOUT = 10;
-
- private static class ServerInstances {
- final String jid;
- final XmppManager manager;
- final XmppFederationHost host;
- final XmppFederationRemote remote;
- final XmppDisco disco;
- final MockOutgoingPacketTransport transport;
-
- ServerInstances(String domain, MockOutgoingPacketTransport.Router router) {
- // Mocks.
- host = mock(XmppFederationHost.class);
- remote = mock(XmppFederationRemote.class);
- disco = mock(XmppDisco.class);
-
- // 'Real' instantiated classes!
- jid = "wave." + domain;
- transport = new MockOutgoingPacketTransport(router);
-
- final Map<String, Object> props = new HashMap<>();
- props.put("federation.xmpp_disco_successful_expiry", "6s");
- props.put("federation.xmpp_jid", jid);
- manager = new XmppManager(
- host, remote, disco, transport, ConfigFactory.parseMap(props));
-
- // Verify manager callback.
- verify(host).setManager(eq(manager));
- verify(remote).setManager(eq(manager));
- verify(disco).setManager(eq(manager));
- }
- }
-
- private ServerInstances server1;
- private ServerInstances server2;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- server1 = new ServerInstances(SERVER1_DOMAIN, new Router() {
- @Override
- public void route(Packet packet) {
- server2.manager.receivePacket(packet);
- }
- });
- server2 = new ServerInstances(SERVER2_DOMAIN, new Router() {
- @Override
- public void route(Packet packet) {
- server1.manager.receivePacket(packet);
- }
- });
- }
-
- /**
- * Test the simple case of packet send/receive by sending a malformed request.
- */
- public void testPacketSendMalformedFailure() {
- Packet packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("irrelevant");
- packet.setTo(server2.jid);
-
- PacketCallback callback = mock(PacketCallback.class);
-
- // Send an outgoing packet from server1 -> server2
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
- assertEquals("First transport should have a single packet pending",
- 1, server1.transport.packets.size());
- assertEquals("First transport should have unmodified outgoing packet",
- packet, server1.transport.packets.peek());
-
- // Confirm that server2 sent back an error
- assertEquals("Second transport should have a single packet pending",
- 1, server2.transport.packets.size());
- assertNotNull("Second transport should be an error packet",
- server2.transport.packets.peek().getError());
-
- // Ensure the error is interpreted correctly and returned to the callback
- ArgumentCaptor<FederationError> errorCaptor = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(errorCaptor.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals("Invalid packet was sent, error should be BAD_REQUEST",
- FederationError.Code.BAD_REQUEST, errorCaptor.getValue().getErrorCode());
- }
-
- /**
- * Test the simple case of having a response invoked based entirely on the
- * timeout case.
- */
- public void testPacketTimeout() throws Exception {
- int TIMEOUT_DELAY = 0;
- int TIMEOUT_WAIT = 5;
-
- // Send a valid packet, so it is received by the remote Disco mock, but not processed.
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("disco");
- packet.setTo(server2.jid);
- packet.setType(IQ.Type.get);
- packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- PacketCallback callback = mock(PacketCallback.class);
- final CountDownLatch finished = new CountDownLatch(1);
-
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- FederationError error = (FederationError) invocation.getArguments()[0];
- assertEquals(FederationError.Code.REMOTE_SERVER_TIMEOUT, error.getErrorCode());
- finished.countDown();
- return null;
- }
- }).when(callback).error(any(FederationError.class));
-
- server1.manager.send(packet, callback, TIMEOUT_DELAY);
- assertTrue(finished.await(TIMEOUT_WAIT, TimeUnit.SECONDS));
- verify(callback, never()).run(any(Packet.class));
-
- // Mockito says never to reset a mock, but we have to as the callback is
- // already used deep in XmppManager.
- Mockito.reset(callback);
-
- // For fun, process the request by the remote disco and return a response
- // that will never be processed.
- ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class);
- verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture());
- XmppDisco realDisco = new XmppDisco(MockDisco.config);
- realDisco.setManager(server2.manager);
- realDisco.processDiscoItemsGet(packet, server2Callback.getValue());
-
- // Confirm disco on server2 has replied with a packet.
- assertEquals(1, server2.transport.packets.size());
- assertEquals(null, server2.transport.packets.peek().getError());
-
- // Confirm, however, that the packet is dropped by the first manager (no pending call!).
- verifyZeroInteractions(callback);
- }
-
- /**
- * Test that an arbitrary error response is properly returned when generated
- * by the second server. Also ensure that the second server can't invoke its
- * callback twice.
- */
- public void testErrorResponse() {
- FederationError.Code TEST_CODE = FederationError.Code.NOT_AUTHORIZED;
- PacketError.Condition TEST_CONDITION = PacketError.Condition.not_authorized;
-
- // Send a valid packet, so it is received by the remote Disco mock, but not
- // explicitly processed.
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("disco");
- packet.setTo(server2.jid);
- packet.setType(IQ.Type.get);
- packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Accept the disco request and return TEST_CODE error.
- ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class);
- verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture());
- server2Callback.getValue().error(FederationErrors.newFederationError(TEST_CODE));
-
- // Try to then complete the message, but cause an IllegalStateException.
- IQ fakeResponse = IQ.createResultIQ(packet);
- try {
- server2Callback.getValue().run(fakeResponse);
- fail("Should not be able to invoke callback twice");
- } catch (IllegalStateException e) {
- // pass
- }
-
- // Check the outgoing packet log.
- assertEquals(1, server2.transport.packets.size());
- Packet errorResponse = server2.transport.packets.peek();
- PacketError error = errorResponse.getError();
- assertNotNull(error);
- assertEquals(TEST_CONDITION, error.getCondition());
-
- // Assert that the error response does *not* include the original packet.
- assertTrue(errorResponse instanceof IQ);
- IQ errorIQ = (IQ) errorResponse;
- assertEquals(null, errorIQ.getChildElement());
-
- // Confirm that the error is received properly on the first server.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(TEST_CODE, returnedError.getValue().getErrorCode());
-
- // If we push the error again, it should be dropped. Note that resetting the
- // callback here is the simplest way to test this, since it is already
- // registered inside the manager.
- reset(callback);
- server1.manager.receivePacket(errorResponse);
- verifyZeroInteractions(callback);
- }
-
- /**
- * Test that an unhandled error (e.g. <forbidden>) is translated to
- * UNDEFINED_CONDITION before being returned to the mocked callback.
- */
- public void testUnhandledErrorResponse() {
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("foo");
- packet.setTo(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Generate an explicit error <forbidden>.
- IQ errorPacket = IQ.createResultIQ(packet);
- errorPacket.setError(PacketError.Condition.forbidden);
- server1.manager.receivePacket(errorPacket);
-
- // Confirm that <forbidden> is transformed to UNDEFINED_CONDITION.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode());
- }
-
- /**
- * Test that packet IDs cannot be re-used while in-flight, and also that may
- * be re-used later.
- */
- public void testReusePacketId() throws Exception {
- int REUSE_FAIL_WAIT = 5;
-
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("foo-packet");
- packet.setTo(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
- assertEquals(1, server1.transport.packets.size());
- assertEquals(packet, server1.transport.packets.poll());
-
- // Try sending another packet with the same ID - must fail (called back in
- // another thread)!
- PacketCallback invalidCallback = mock(PacketCallback.class);
- final CountDownLatch finished = new CountDownLatch(1);
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- FederationError error = (FederationError) invocation.getArguments()[0];
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, error.getErrorCode());
- finished.countDown();
- return null;
- }
- }).when(invalidCallback).error(any(FederationError.class));
-
- server1.manager.send(packet, invalidCallback, PACKET_TIMEOUT);
- assertTrue(finished.await(REUSE_FAIL_WAIT, TimeUnit.SECONDS));
- verify(invalidCallback, never()).run(any(Packet.class));
-
- // Generate an explicit success response.
- IQ successPacket = IQ.createResultIQ(packet);
- server1.manager.receivePacket(successPacket);
- verify(callback).run(eq(successPacket));
- verify(callback, never()).error(any(FederationError.class));
-
- // Again, re-use the ID: should succeed since it is cleared from callbacks.
- PacketCallback zeroCallback = mock(PacketCallback.class);
- server1.manager.send(packet, zeroCallback, PACKET_TIMEOUT);
- assertEquals(1, server1.transport.packets.size());
- assertEquals(packet, server1.transport.packets.poll());
- verifyZeroInteractions(zeroCallback);
- }
-
- /**
- * Test that if (e.g.) an IQ is sent, then an IQ must be returned as a
- * response. If a Message is returned instead, this should invoke an error
- * callback.
- */
- public void testDropInvalidResponseType() throws Exception {
- IQ packet = server1.manager.createRequestIQ(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Generate an explicit Message receipt.
- Message response = new Message();
- response.setTo(packet.getFrom());
- response.setID(packet.getID());
- response.setFrom(packet.getTo());
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- server1.manager.receivePacket(response);
-
- // Confirm that an error callback is invoked.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode());
-
- // Confirm that sending a correct response now does nothing.
- reset(callback);
- IQ correctResponse = IQ.createResultIQ(packet);
- server1.manager.receivePacket(correctResponse);
- verifyZeroInteractions(callback);
- }
-
-}