You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wave-commits@incubator.apache.org by yu...@apache.org on 2016/05/06 12:06:50 UTC
[3/3] incubator-wave git commit: WAVE-438 - Removes XMPP federation
implementation along with relevant resources and unit tests. Fixes issue with
a test when running with Java1.8.
WAVE-438 - Removes XMPP federation implementation along with relevant resources and unit tests.
Fixes issue with a test when running with Java1.8.
Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/ed4feb70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/ed4feb70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/ed4feb70
Branch: refs/heads/master
Commit: ed4feb7018f35242fe5ecb00bf98b9462d7836ff
Parents: 29f3d34
Author: Yuri Zelikov <yu...@apache.org>
Authored: Thu May 5 14:50:30 2016 +0300
Committer: Yuri Zelikov <yu...@apache.org>
Committed: Fri May 6 14:29:26 2016 +0300
----------------------------------------------------------------------
README.md | 11 -
wave/build.gradle | 7 +-
wave/config/reference.conf | 31 +-
wave/config/server-config.xml | 65 --
.../box/server/gxp/AuthenticationPage.gxp | 1 -
.../org/waveprotocol/box/server/ServerMain.java | 14 +-
.../server/executor/ExecutorAnnotations.java | 5 -
.../box/server/executor/ExecutorsModule.java | 9 -
.../wave/federation/xmpp/Base64Util.java | 67 --
.../xmpp/ComponentPacketTransport.java | 149 ----
.../federation/xmpp/IncomingPacketHandler.java | 38 --
.../xmpp/OutgoingPacketTransport.java | 38 --
.../wave/federation/xmpp/PacketCallback.java | 34 -
.../wave/federation/xmpp/RemoteDisco.java | 430 ------------
.../federation/xmpp/SuccessFailCallback.java | 30 -
.../wave/federation/xmpp/XmppDisco.java | 212 ------
.../federation/xmpp/XmppFederationHost.java | 446 ------------
.../xmpp/XmppFederationHostForDomain.java | 173 -----
.../federation/xmpp/XmppFederationModule.java | 61 --
.../federation/xmpp/XmppFederationRemote.java | 633 -----------------
.../xmpp/XmppFederationTransport.java | 50 --
.../wave/federation/xmpp/XmppManager.java | 474 -------------
.../wave/federation/xmpp/XmppNamespace.java | 43 --
.../wave/federation/xmpp/XmppUtil.java | 182 -----
.../persistence/file/AccountStoreTest.java | 3 +-
.../persistence/file/AttachmentStoreTest.java | 3 +-
.../persistence/file/CertPathStoreTest.java | 4 +-
.../server/persistence/file/DeltaStoreTest.java | 5 +-
.../wave/federation/xmpp/MockDisco.java | 108 ---
.../xmpp/MockOutgoingPacketTransport.java | 73 --
.../wave/federation/xmpp/RemoteDiscoTest.java | 138 ----
.../wave/federation/xmpp/RoundTripTest.java | 378 -----------
.../wave/federation/xmpp/XmppDiscoTest.java | 674 -------------------
.../xmpp/XmppFederationHostForDomainTest.java | 329 ---------
.../xmpp/XmppFederationRemoteTest.java | 497 --------------
.../SimpleWantedEvaluationSetTest.java | 2 +-
36 files changed, 16 insertions(+), 5401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b33916a..4572e41 100644
--- a/README.md
+++ b/README.md
@@ -160,17 +160,6 @@ Note:
- if a jar is unable to be unzipped with wave:extractApi then delete the jar from your cache and try again.
You may need to restart. If problem persists let the newsgroup know or create an issue on Jira.
-To config your server a default configuration is provided by reference.conf,
-this can be overwritten by application.conf with custom values.
-
-To enable federation the following must be run.
-
-To create a simple configuration run:
- `./gradlew prosody-config`
-
-To override default values pass them to the ant script.
-For example, to override wave\_server\_domain run:
-`./gradlew prosody-config -Dwave_server_domain=example.com`
Take a look at the reference.conf to learn about configuration and possible/default values.
The server can be started (on Linux/MacOS) by running
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/build.gradle
----------------------------------------------------------------------
diff --git a/wave/build.gradle b/wave/build.gradle
index 71ee962..f09b059 100644
--- a/wave/build.gradle
+++ b/wave/build.gradle
@@ -30,7 +30,7 @@ apply plugin: 'com.google.protobuf'
/* Meta Data Info */
def title = 'Apache Wave Server'
def vendor = 'The Apache Software Foundation'
-version = "0.4.1"
+version = "0.4.2"
mainClassName = "org.waveprotocol.box.server.ServerMain"
applicationDefaultJvmArgs = [
"-Xmx1024M",
@@ -166,9 +166,6 @@ dependencies {
[group: "org.eclipse.jetty.websocket", name: "websocket-common", version: "9.2.14.v20151106"], // [?, ?]
[group: "org.eclipse.jetty.websocket", name: "websocket-server", version: "9.2.14.v20151106"], // [?, ?]
[group: "org.eclipse.jetty.websocket", name: "websocket-servlet", version: "9.2.14.v20151106"], // [?, ?]
- [group: "org.gnu.inet", name: "libidn", version: "1.15"], // [?, ?]
- [group: "org.igniterealtime", name: "tinder", version: "1.2.3"], // [1/2016, 6/2016]
- [group: "org.igniterealtime.whack", name: "core", version: "2.0.0"], // [1/2016, 6/2016]
[group: "org.jdom", name: "jdom", version: "1.1.3"], // [?, ?]
[group: "org.mongodb", name: "mongo-java-driver", version: "2.11.2"], // [?, ?]
[group: "org.slf4j", name: "slf4j-api", version: "1.6.1"], // [?, ?]
@@ -507,8 +504,6 @@ testGwt.mustRunAfter compileGwt, testLarge
testMongo.mustRunAfter compileJava, test
testLarge.mustRunAfter test
-ant.importBuild 'config/server-config.xml'
-
//=============================================================================
// Custom UberJar Implementation
// Author Note: this custom implementation should be replaced by the shadow
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/config/reference.conf
----------------------------------------------------------------------
diff --git a/wave/config/reference.conf b/wave/config/reference.conf
index 425a576..30cf51f 100644
--- a/wave/config/reference.conf
+++ b/wave/config/reference.conf
@@ -207,13 +207,11 @@ security {
clientauth_cert_domain : ""
}
+# Please note that currently Wave in a Box server has no Federation implementation.
federation {
# Federation Configuration for the Wave in a Box server
enable_federation : false
- # These will probably need to be changed
- xmpp_server_secret : secret
-
# The PKCS#8-PEM-encoded private key.
certificate_private_key : "local.net.key"
@@ -228,33 +226,6 @@ federation {
# The domain for which the certificate was issued.
certificate_domain : "local.net"
- xmpp_component_name : wave
-
- # This server's local JID
- xmpp_jid : "wave.local.net"
-
- xmpp_server_description : "Wave in a Box"
-
- disco_info_category : "collaboration"
-
- disco_info_type : "apache-wave"
-
- xmpp_server_hostname : "local.net"
-
- xmpp_server_component_port : 5275
-
- # How long to cache failed disco results.
- xmpp_disco_failed_expiry : 300s
-
- # How long to cache successful disco results.
- xmpp_disco_successful_expiry : 7200s
-
- disco_expiration : 6h
-
- # Set XMPP_SERVER_IP to localhost if the XMPP and Wave in a Box servers are
- # running on the same host
- xmpp_server_ip : localhost
-
# Set true to disable the verification of signed deltas
waveserver_disable_verification : true
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/config/server-config.xml
----------------------------------------------------------------------
diff --git a/wave/config/server-config.xml b/wave/config/server-config.xml
deleted file mode 100644
index deb5f3b..0000000
--- a/wave/config/server-config.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<!--
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- -->
-<project name="server config" basedir="../" default="prosody-config">
- <description>Creates the server configuration file.</description>
-
- <property name="wave_server_domain" value="local.net" />
- <property name="xmpp_server_secret" value="opensesame" />
- <property name="certificate_private_key" value="${wave_server_domain}.key" />
- <property name="certificate_files" value="${wave_server_domain}.crt,sub.class1.server.ca.pem,ca.pem" />
- <property name="certificate_domain" value="${wave_server_domain}" />
- <property name="xmpp_component_name" value="wave" />
- <property name="xmpp_jid" value="${xmpp_component_name}.${wave_server_domain}" />
- <property name="xmpp_server_description" value=""Wave in a Box"" />
- <property name="xmpp_server_hostname" value="${wave_server_domain}" />
- <property name="xmpp_server_component_port" value="5275" />
- <property name="xmpp_server_to_server_port" value="5269" />
- <property name="xmpp_server_ping" value="wavesandbox.com" />
- <property name="xmpp_server_ip" value="${xmpp_server_hostname}" />
- <property name="waveserver_disable_verification" value="false" />
- <property name="waveserver_disable_signer_verification" value="false" />
-
-
- <target name="prosody-config"
- description="Run to create the prosody configuration files.
- ant -f server-config.xml prosody-config">
- <echo>Generating ${certificate_domain}.cfg.lua</echo>
- <copy file="${certificate_domain}.cfg.lua"
- tofile="${certificate_domain}.cfg.lua.old"
- overwrite="true"
- failonerror="false" />
- <copy file="prosody.cfg.lua.example" tofile="${certificate_domain}.cfg.lua" overwrite="true">
- <filterchain>
- <replacetokens>
- <token key="BASEDIR" value="${basedir}" />
- <token key="XMPP_SERVER_SECRET" value="${xmpp_server_secret}" />
- <token key="CERTIFICATE_PRIVATE_KEY" value="${certificate_private_key}" />
- <token key="CERTIFICATE_DOMAIN" value="${certificate_domain}" />
- <token key="XMPP_JID" value="${xmpp_jid}" />
- <token key="XMPP_SERVER_DESCRIPTION" value="${xmpp_server_description}" />
- <token key="XMPP_SERVER_COMPONENT_PORT" value="${xmpp_server_component_port}" />
- <token key="XMPP_SERVER_TO_SERVER_PORT" value="${xmpp_server_to_server_port}" />
- </replacetokens>
- </filterchain>
- </copy>
- <echo>Please, manually copy ${certificate_domain}.cfg.lua to your prosody configuration directory.</echo>
- <echo>E.g. sudo cp ${certificate_domain}.cfg.lua /etc/prosody/conf.d/${certificate_domain}.cfg.lua</echo>
- <echo>Additionally, ensure your ${certificate_domain} SRV record points to port ${xmpp_server_to_server_port}</echo>
- </target>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
----------------------------------------------------------------------
diff --git a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
index d8791d4..d188b05 100644
--- a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
+++ b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
@@ -106,7 +106,6 @@
<li>
This project lets developers and
enterprise users run wave servers and host waves on their own hardware.
- And then share those waves with other wave servers.
</li>
</ul>
</p>
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
index d258f9c..727ad44 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
@@ -61,7 +61,6 @@ import org.waveprotocol.box.stat.StatService;
import org.waveprotocol.wave.crypto.CertPathStore;
import org.waveprotocol.wave.federation.FederationTransport;
import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
-import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
import org.waveprotocol.wave.model.version.HashedVersionFactory;
import org.waveprotocol.wave.model.wave.ParticipantIdUtil;
import org.waveprotocol.wave.util.logging.Log;
@@ -108,10 +107,9 @@ public class ServerMain {
injector = injector.createChildInjector(profilingModule, executorsModule);
Config config = injector.getInstance(Config.class);
- boolean enableFederation = config.getBoolean("federation.enable_federation");
Module serverModule = injector.getInstance(ServerModule.class);
- Module federationModule = buildFederationModule(injector, enableFederation);
+ Module federationModule = buildFederationModule(injector);
Module robotApiModule = new RobotApiModule();
PersistenceModule persistenceModule = injector.getInstance(PersistenceModule.class);
Module searchModule = injector.getInstance(SearchModule.class);
@@ -140,15 +138,9 @@ public class ServerMain {
server.startWebSocketServer(injector);
}
- private static Module buildFederationModule(Injector settingsInjector, boolean enableFederation)
+ private static Module buildFederationModule(Injector settingsInjector)
throws ConfigurationException {
- Module federationModule;
- if (enableFederation) {
- federationModule = settingsInjector.getInstance(XmppFederationModule.class);
- } else {
- federationModule = settingsInjector.getInstance(NoOpFederationModule.class);
- }
- return federationModule;
+ return settingsInjector.getInstance(NoOpFederationModule.class);
}
private static void initializeServer(Injector injector, String waveDomain)
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
index a002c76..5621610 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
@@ -80,11 +80,6 @@ public interface ExecutorAnnotations {
public @interface RobotGatewayExecutor {
}
- @Retention(RUNTIME)
- @BindingAnnotation
- public @interface XmppExecutor {
- }
-
@BindingAnnotation
public @interface SolrExecutor {
}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
index bfcd345..2d8e65e 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
@@ -137,15 +137,6 @@ public class ExecutorsModule extends AbstractModule {
@Provides
@Singleton
- @XmppExecutor
- protected ScheduledExecutorService provideXmppExecutor(
- Provider<ScheduledRequestScopeExecutor> executorProvider) {
- return provideScheduledThreadPoolExecutor(executorProvider, 1, XmppExecutor.class
- .getSimpleName());
- }
-
- @Provides
- @Singleton
@SolrExecutor
protected Executor provideSolrExecutor(Provider<RequestScopeExecutor> executorProvider,
Config config) {
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java
deleted file mode 100644
index 6ab6569..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.protobuf.AbstractMessageLite;
-import com.google.protobuf.ByteString;
-
-import org.apache.commons.codec.binary.Base64;
-
-import java.nio.charset.Charset;
-
-/**
- * Utility class for encoding and decoding ByteStrings, byte arrays and encoding
- * generic protocol buffers.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-public final class Base64Util {
-
- // Character set for all encoding and decoding. Base64 can be correctly
- // represented using UTF-8.
- private static final Charset CHAR_SET = Charset.forName("UTF-8");
-
- /**
- * Utility class only, cannot be instantiated.
- */
- private Base64Util() {
- }
-
- public static String encode(ByteString bs) {
- return new String(Base64.encodeBase64(bs.toByteArray()), CHAR_SET);
- }
-
- public static String encode(byte[] ba) {
- return new String(Base64.encodeBase64(ba), CHAR_SET);
- }
-
- public static String encode(AbstractMessageLite message) {
- return new String(Base64.encodeBase64(message.toByteArray()), CHAR_SET);
- }
-
- public static byte[] decodeFromArray(String str) {
- return Base64.decodeBase64(str.getBytes(CHAR_SET));
- }
-
- public static ByteString decode(String str) {
- return ByteString.copyFrom(Base64.decodeBase64(str.getBytes(CHAR_SET)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java
deleted file mode 100644
index a98dbbc..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.jivesoftware.whack.ExternalComponentManager;
-import org.xmpp.component.Component;
-import org.xmpp.component.ComponentException;
-import org.xmpp.component.ComponentManager;
-import org.xmpp.packet.JID;
-import org.xmpp.packet.Packet;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.logging.Logger;
-
-/**
- * Talks to a XMPP server using the Jabber Component Protocol (XEP-0114).
- *
- * Implements {@link OutgoingPacketTransport} allowing users to send packets,
- * and accepts an {@link IncomingPacketHandler} which can process incoming
- * packets.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class ComponentPacketTransport implements Component, OutgoingPacketTransport {
- private static final Logger LOG =
- Logger.getLogger(ComponentPacketTransport.class.getCanonicalName());
-
- private final IncomingPacketHandler handler;
- private final String componentName;
- private final String serverDomain;
- private final String serverSecret;
- private final String serverAddress;
- private final int serverPort;
-
- // Contains packets queued but not sent (while offline).
- private final Queue<Packet> queuedPackets;
-
- // Object used to lock around online/offline state changes.
- private final Object connectionLock = new Object();
-
- private ExternalComponentManager componentManager = null;
- private boolean connected = false;
-
- @Inject
- public ComponentPacketTransport(IncomingPacketHandler handler, Config config) {
- this.handler = handler;
- this.componentName = config.getString("federation.xmpp_component_name");
- this.serverDomain = config.getString("federation.xmpp_server_hostname");
- this.serverSecret = config.getString("federation.xmpp_server_secret");
- this.serverAddress = config.getString("federation.xmpp_server_ip");
- this.serverPort = config.getInt("federation.xmpp_server_component_port");
-
- queuedPackets = new LinkedList<>();
- }
-
- /**
- * Bind the component to the XMPP server.
- *
- * @throws ComponentException if the component couldn't talk to the server
- */
- public void run() throws ComponentException {
- componentManager = new ExternalComponentManager(serverAddress, serverPort);
- componentManager.setDefaultSecretKey(serverSecret);
- componentManager.setServerName(serverDomain);
-
- // Register this component with the manager.
- componentManager.addComponent(componentName, this);
- }
-
- @Override
- public void sendPacket(Packet packet) {
- synchronized (connectionLock) {
- if (connected) {
- componentManager.sendPacket(this, packet);
- } else {
- queuedPackets.add(packet);
- }
- }
- }
-
- @Override
- public String getDescription() {
- return "Wave in a Box Server";
- }
-
- @Override
- public String getName() {
- return componentName;
- }
-
- @Override
- public void initialize(JID jid, ComponentManager componentManager) {
- // TODO(thorogood): According to XEP-0114, the only valid JID here is the
- // same JID we attempt to connect to the XMPP server with.
- LOG.info("Initializing with JID: " + jid);
- }
-
- /**
- * {@inheritDoc}
- *
- * Pass the incoming on-the-wire packet onto the incoming handler.
- */
- @Override
- public void processPacket(Packet packet) {
- handler.receivePacket(packet);
- }
-
- @Override
- public void shutdown() {
- synchronized (connectionLock) {
- LOG.info("Disconnected from XMPP server.");
- componentManager = null;
- connected = false;
- }
- }
-
- @Override
- public void start() {
- synchronized (connectionLock) {
- connected = true;
- LOG.info("Connected to XMPP server with JID: " + componentName + "." + serverDomain);
-
- // Send all queued outgoing packets.
- while (!queuedPackets.isEmpty()) {
- componentManager.sendPacket(this, queuedPackets.poll());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java
deleted file mode 100644
index cb60ac0..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-/**
- * Generic incoming XMPP packet handler interface. This should only be
- * implemented by {@link XmppManager}, regardless of which wire transport is in
- * use.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface IncomingPacketHandler {
-
- /**
- * Accept a generic XMPP packet from on-the-wire.
- */
- void receivePacket(Packet packet);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java
deleted file mode 100644
index 4899df0..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-/**
- * Generic outgoing XMPP packet transport interface. Should be implemented by
- * the handling XMPP transport (e.g. component system).
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface OutgoingPacketTransport {
-
- /**
- * Send a packet over-the-wire to its prescribed destination address. Provides
- * no guarantees of delivery or callback.
- */
- void sendPacket(Packet packet);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java
deleted file mode 100644
index e86a091..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-
-/**
- * Simple callback type used for sending and receiving reliable XMPP packet
- * messages. This allows for clearly defined success and failure states.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface PacketCallback {
- void run(Packet packet);
- void error(FederationError error);
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java
deleted file mode 100644
index b6d5868..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Sets;
-
-import org.dom4j.Attribute;
-import org.dom4j.Element;
-import org.joda.time.DateTimeUtils;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Packet;
-
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-/**
- * Represents XMPP disco status for a specific remote domain. This class only
- * exposes one public method; {@link #discoverRemoteJID}.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class RemoteDisco {
- private static final Logger LOG = Logger.getLogger(RemoteDisco.class.getCanonicalName());
-
- static final int MAXIMUM_DISCO_ATTEMPTS = 5;
- static final int MINIMUM_REXMIT_MS = 15000;
- static final int REXMIT_JITTER_MS = 2000;
- static final int DISCO_INFO_TIMEOUT = 20;
-
- private final long creationTimeMillis;
- private final long failExpirySecs;
- private final long successExpirySecs;
-
- enum Status {
- INIT, PENDING, COMPLETE
- }
-
- private final Random random = new SecureRandom();
- private final XmppManager manager;
- private final String remoteDomain;
- private final AtomicReference<Status> status;
- private final Queue<SuccessFailCallback<String, String>> pending;
-
- // Result JID field that will be available on COMPLETE status.
- private String remoteJid;
-
- // Error field that will be available on COMPLETE status.
- private FederationError error;
-
-
- // These two values are used for tracking success and failure counts.
- // Not yet exposed in the fedone waveserver.
- public static final LoadingCache<String, AtomicLong> statDiscoSuccess =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(String domain) {
- return new AtomicLong();
- }
- });
-
- public static final LoadingCache<String, AtomicLong> statDiscoFailed =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(String domain) {
- return new AtomicLong();
- }
- });
-
- /**
- * Construct a new RemoteDisco targeting the given domain. This will not kick
- * off the disco request itself.
- * @param manager XmppManager object, used to send packets
- * @param remoteDomain the name of the remote domain (not JID)
- * @param failExpirySecs how long to keep alive a failed disco result
- * @param successExpirySecs how long to keep alive a successful disco result
- */
- public RemoteDisco(XmppManager manager, String remoteDomain, long failExpirySecs,
- long successExpirySecs) {
- this.manager = manager;
- status = new AtomicReference<Status>(Status.INIT);
- pending = new ConcurrentLinkedQueue<SuccessFailCallback<String, String>>();
- this.remoteDomain = remoteDomain;
- this.creationTimeMillis = DateTimeUtils.currentTimeMillis();
- this.failExpirySecs = failExpirySecs;
- this.successExpirySecs = successExpirySecs;
- }
-
- /**
- * Construct a new RemoteDisco - purely for testing - with an already
- * determined result. Either jid or error must be passed.
- *
- * @param remoteDomain the name of the remote domain (not JID)
- * @param jid the domain's remote JID
- * @param error the error from disco
- */
- @VisibleForTesting
- RemoteDisco(String remoteDomain, String jid, FederationError error) {
- Preconditions.checkArgument((jid != null)^(error != null));
-
- manager = null;
- status = new AtomicReference<Status>(Status.COMPLETE);
- pending = null;
- this.remoteDomain = remoteDomain;
- this.remoteJid = jid;
- this.error = error;
- // defaults for testing
- this.creationTimeMillis = DateTimeUtils.currentTimeMillis();
- this.failExpirySecs = 2 * 60;
- this.successExpirySecs = 2 * 60 * 60;
- }
-
- /**
- * Check whether the request is currently PENDING. Visible only for tests.
- * @return true if pending else false
- */
- @VisibleForTesting
- boolean isRequestPending() {
- return status.get().equals(Status.PENDING);
- }
-
- /**
- * Attempt to discover the remote JID for this domain. If the JID has already
- * been discovered, then this method will invoke the callback immediately.
- * Otherwise, the callback is guaranteed to be invoked at a later point.
- *
- * @param callback a callback to be invoked when disco is complete
- */
- public void discoverRemoteJID(SuccessFailCallback<String, String> callback) {
- if (status.get().equals(Status.COMPLETE)) {
- complete(callback);
- } else if (status.compareAndSet(Status.INIT, Status.PENDING)) {
- pending.add(callback);
- startDisco();
- } else {
- pending.add(callback);
-
- // If we've become complete since the start of this method, complete
- // all possible callbacks.
- if (status.get().equals(Status.COMPLETE)) {
- SuccessFailCallback<String, String> item;
- while ((item = pending.poll()) != null) {
- complete(item);
- }
- }
- }
- }
-
- /**
- * Returns true if this RemoteDisco's time to live is exceeded.
- *
- * We can't use MapMaker's expiration code as it won't let us have different expiry for
- * successful and failed cases.
- *
- * @return whether this object should be deleted and recreated
- */
- public boolean ttlExceeded() {
- if (status.get() == Status.COMPLETE) {
- if (remoteJid == null) {
- // Failed disco case
- if (DateTimeUtils.currentTimeMillis() >
- (creationTimeMillis + (1000 * failExpirySecs))) {
- return true;
- }
- } else {
- // Successful disco case
- if (DateTimeUtils.currentTimeMillis() >
- (creationTimeMillis + (1000 * successExpirySecs))) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Complete any specific callback (in the current thread). Requires the status
- * to be COMPLETE.
- *
- * TODO(thorogood): thread model for completing callbacks
- * @param callback the callback to complete
- */
- private void complete(SuccessFailCallback<String, String> callback) {
- Preconditions.checkState(status.get().equals(Status.COMPLETE));
- if (remoteJid != null) {
- callback.onSuccess(remoteJid);
- } else {
- // TODO(thorogood): better toString, or change failure type to FederationError
- callback.onFailure(error.toString());
- }
- }
-
- /**
- * Start XMPP discovery. Kicks off a retrying call to dial-up the remote
- * server and discover its available disco items.
- *
- * This should only be called by a method holding the PENDING state.
- */
- private void startDisco() {
- final IQ request = manager.createRequestIQ(remoteDomain);
- request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- final Runnable requester = new Runnable() {
- int attempt = 0;
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void run(Packet result) {
- Preconditions.checkArgument(result instanceof IQ, "Manager must provide response IQ");
- processDiscoItemsResult((IQ) result);
- }
-
- @Override
- public void error(FederationError error) {
- if (error.getErrorCode().equals(FederationError.Code.REMOTE_SERVER_TIMEOUT)) {
- retry();
- } else {
- LOG.info("Remote server " + remoteDomain + " failed on disco items: "
- + error.getErrorCode());
- processDiscoItemsResult(null);
- }
- }
- };
-
- void retry() {
- attempt += 1;
- if (attempt > MAXIMUM_DISCO_ATTEMPTS) {
- finish(null, FederationErrors
- .newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT));
- } else {
- // TODO(thorogood): fix ms/seconds!
- int timeout = nextDiscoRetransmitTimeout(attempt) / 1000;
- request.setID(XmppUtil.generateUniqueId());
- LOG.info("Sending disco items request for: " + remoteDomain + ", timeout " + timeout
- + " seconds");
- manager.send(request, callback, timeout);
- }
- }
-
- @Override
- public void run() {
- retry();
- }
- };
-
- // Kick off requester!
- requester.run();
- }
-
- /**
- * Calculate the requested timeout for any given request number. Introduces
- * random jitter.
- *
- * @param attempt the attempt count
- * @return request timeout in ms
- */
- private int nextDiscoRetransmitTimeout(int attempt) {
- Preconditions.checkArgument(attempt > 0);
- return MINIMUM_REXMIT_MS * (1 << (attempt - 1)) + random.nextInt(REXMIT_JITTER_MS);
- }
-
- /**
- * Process a returned set of disco items. Invoke a query for each item in
- * parallel, searching for any item which supports Wave.
- *
- * @param result IQ stanza provided from disco items, if null try default items
- */
- private void processDiscoItemsResult(@Nullable IQ result) {
- Set<String> candidates = Sets.newHashSet();
-
- // Traverse the source list, finding possible JID candidates.
- if (result != null) {
- List<Element> items = XmppUtil.toSafeElementList(result.getChildElement().elements("item"));
- for (Element item : items) {
- Attribute jid = item.attribute("jid");
- if (jid != null) {
- candidates.add(jid.getValue());
- }
- }
- }
-
- // Returned nothing for the items list. Try the domain itself.
- if (candidates.isEmpty()) {
- candidates.add(remoteDomain);
- }
-
- // Always query 'wave.', as an automatic fallback.
- candidates.add("wave." + remoteDomain);
-
- // Iterate over all candidates, requesting information in parallel.
- AtomicInteger sharedLatch = new AtomicInteger(candidates.size());
- for (String candidate : candidates) {
- requestDiscoInfo(candidate, sharedLatch);
- }
- }
-
- /**
- * Request disco info from a specific target JID. Accepts a target JID as well
- * as a shared latch: on a result, the latch should be decremented and if it
- * reaches zero, finish() must be invoked with an error.
- *
- * @param target the target JID
- * @param sharedLatch a shared latch
- */
- private void requestDiscoInfo(String target, final AtomicInteger sharedLatch) {
- final IQ request = manager.createRequestIQ(target);
- request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
-
- PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- int currentCount = sharedLatch.decrementAndGet();
- Preconditions.checkState(currentCount >= 0,
- "Info latch should not count down past zero for domain: %s", remoteDomain);
- if (currentCount == 0) {
- finish(null, error);
- }
- }
-
- @Override
- public void run(Packet packet) {
- Preconditions.checkArgument(packet instanceof IQ);
- IQ result = (IQ) packet;
-
- List<Element> features =
- XmppUtil.toSafeElementList(result.getChildElement().elements("feature"));
- for (Element feature : features) {
- Attribute var = feature.attribute("var");
- if (var != null && var.getValue().equals(XmppNamespace.NAMESPACE_WAVE_SERVER)) {
- String targetJID = packet.getFrom().toString();
- finish(targetJID, null);
-
- // Decrement the latch *after* finishing, so we don't allow an error
- // callback to be kicked off.
- Preconditions.checkState(sharedLatch.decrementAndGet() >= 0,
- "Info latch should not count down past zero for domain: %s", remoteDomain);
- return;
- }
- }
-
- // This result didn't contain a useful result JID, so cause an error.
- error(FederationErrors.newFederationError(FederationError.Code.ITEM_NOT_FOUND));
- }
- };
-
- LOG.info("Sending disco info request for: " + target);
- manager.send(request, callback, DISCO_INFO_TIMEOUT);
- }
-
- /**
- * Finish this disco attempt with either a success or error result. This
- * method should only be called on a thread that owns the PENDING state and
- * will (if successful) result in a transition to COMPLETE. If the disco
- * attempt is already complete, return false and do nothing (safe operation).
- *
- * @param jid success JID, or null
- * @param error error proto, or null
- * @return true if successful, false if already finished
- */
- @VisibleForTesting
- boolean finish(String jid, FederationError error) {
- Preconditions.checkArgument((jid != null)^(error != null));
- if (!status.compareAndSet(Status.PENDING, Status.COMPLETE)) {
- return false;
- }
-
- // Set either the result JID or error state.
-
- try {
- if (jid != null) {
- this.remoteJid = jid;
- LOG.info("Discovered remote JID: " + jid + " for " + remoteDomain);
- statDiscoSuccess.get(remoteDomain).incrementAndGet();
- } else if (error != null) {
- this.error = error;
- LOG.info("Could not discover remote JID: " + error + " for " + remoteDomain);
- statDiscoFailed.get(remoteDomain).incrementAndGet();
- } else {
- throw new IllegalArgumentException("At least one of jid/error must be set");
- }
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
-
- // Complete all available callbacks.
- SuccessFailCallback<String, String> item;
- while ((item = pending.poll()) != null) {
- complete(item);
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java
deleted file mode 100644
index d88a141..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-/**
- * A generic onSuccess/onFailure callback interface.
- *
- * @author kalman@google.com (Ben Kalman)
- */
-public interface SuccessFailCallback<SuccessValue, FailureValue> {
- void onSuccess(SuccessValue response);
- void onFailure(FailureValue reason);
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java
deleted file mode 100644
index 3e03ef7..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.xmpp.packet.IQ;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Implementation of XMPP Discovery. Provides public methods to respond to incoming disco requests
- * (via {@link XmppManager}), as well as outgoing disco via {{@link #discoverRemoteJid}.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class XmppDisco {
-
- @SuppressWarnings("unused")
- private static final Logger LOG = Logger.getLogger(XmppDisco.class.getCanonicalName());
-
- // This tracks the number of disco attempts started.
- public static final LoadingCache<String, AtomicLong> statDiscoStarted =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(@SuppressWarnings("NullableProblems") String domain) {
- return new AtomicLong();
- }
- });
-
- private final LoadingCache<String, RemoteDisco> discoRequests;
- private final String serverDescription;
-
- private XmppManager manager = null;
- // Accessed by XmppFederationHostForDomain.
- final long failExpirySecs;
- final long successExpirySecs;
- final long discoExpirationHours;
- final String discoInfoCategory;
- final String discoInfoType;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class is ready to use.
- */
- @Inject
- public XmppDisco(Config config) {
- this.serverDescription = config.getString("federation.xmpp_server_description");
- this.discoInfoCategory = config.getString("federation.disco_info_category");
- this.discoInfoType = config.getString("federation.disco_info_type");
- this.failExpirySecs = config.getDuration("federation.xmpp_disco_failed_expiry", TimeUnit.SECONDS);
- this.successExpirySecs = config.getDuration("federation.xmpp_disco_successful_expiry", TimeUnit.SECONDS);
- this.discoExpirationHours = config.getDuration("federation.disco_expiration", TimeUnit.HOURS);
-
- //noinspection NullableProblems
- discoRequests =
- CacheBuilder.newBuilder().expireAfterWrite(
- discoExpirationHours, TimeUnit.HOURS).build(
- new CacheLoader<String, RemoteDisco>() {
-
- @Override
- public RemoteDisco load(String domain) throws Exception {
- statDiscoStarted.get(domain).incrementAndGet();
- return new RemoteDisco(manager, domain, failExpirySecs, successExpirySecs);
- }
- });
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- * @param manager an XmppManager instance
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Handles a disco info get from a foreign source. A remote server is trying to ask us what we
- * support. Send back a message identifying as a wave component.
- *
- * @param iq the IQ packet.
- * @param responseCallback callback used to send response
- */
- void processDiscoInfoGet(IQ iq, PacketCallback responseCallback) {
- IQ response = IQ.createResultIQ(iq);
- Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
-
- query.addElement("identity")
- .addAttribute("category", discoInfoCategory)
- .addAttribute("type", discoInfoType)
- .addAttribute("name", serverDescription);
-
- query.addElement("feature")
- .addAttribute("var", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- responseCallback.run(response);
- }
-
-
- /**
- * Handles a disco items get from a foreign XMPP agent. No useful responses, since we're not a
- * domain on it's own: just the wave component.
- *
- * @param iq the IQ packet.
- * @param responseCallback callback used to send response
- */
- void processDiscoItemsGet(IQ iq, PacketCallback responseCallback) {
- IQ response = IQ.createResultIQ(iq);
- response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
- responseCallback.run(response);
- }
-
- /**
- * Attempt to discover the remote JID for this domain. Hands control to {@link RemoteDisco}.
- *
- * @param remoteDomain the domain to discover
- * @param callback a callback to trigger when disco completes
- */
- public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) {
- Preconditions.checkNotNull("Must call setManager first", manager);
- RemoteDisco disco = discoRequests.getIfPresent(remoteDomain);
- if (disco != null) {
- // This is a race condition, but we don't care if we lose it, because the ttl timestamp
- // won't be exceeded in that case.
- if (disco.ttlExceeded()) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.info("discoverRemoteJid for " + remoteDomain + ": result ttl exceeded.");
- }
- // TODO(arb): should we expose the disco cache somehow for debugging?
- discoRequests.invalidate(remoteDomain);
- }
- }
- try {
- discoRequests.get(remoteDomain).discoverRemoteJID(callback);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Inject a predetermined result into the disco results map. If the passed jid is null, generate
- * an error/not-found case.
- *
- * @param domain remote domain
- * @param jid remote JID
- * @throws IllegalStateException if there is already a result for this domain
- */
- @VisibleForTesting
- void testInjectInDomainToJidMap(String domain, String jid) {
- FederationError error = null;
- if (jid == null) {
- error = FederationErrors.badRequest("Fake injected error");
- }
- RemoteDisco disco = discoRequests.getIfPresent(domain);
- Preconditions.checkState(disco == null);
- discoRequests.put(domain, new RemoteDisco(domain, jid, error));
- }
-
- /**
- * Determine whether a request for the given domain is pending.
- *
- * @param domain remote domain
- * @return true/false
- */
- @VisibleForTesting
- boolean isDiscoRequestPending(String domain) throws ExecutionException {
- RemoteDisco disco = discoRequests.getIfPresent(domain);
- return disco != null && disco.isRequestPending();
- }
-
- /**
- * Determine whether the disco request for the given domain has been touched or is at all
- * available.
- *
- * @param domain remote domain
- * @return true/false
- */
- @VisibleForTesting
- boolean isDiscoRequestAvailable(String domain) {
- return discoRequests.getIfPresent(domain) != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java
deleted file mode 100644
index 09c76d1..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationHostBridge;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.IQ;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Logger;
-
-/**
- * This class encapsulates the incoming packet processing portion of the
- * Federation Host. Messages arrive on this class from a foreign Federation
- * Remote for wavelets hosted by the local wave server.
- */
-public class XmppFederationHost implements WaveletFederationListener.Factory {
- @SuppressWarnings("unused")
- private static final Logger LOG = Logger.getLogger(XmppFederationHost.class.getCanonicalName());
-
- private final WaveletFederationProvider waveletProvider;
-
- private XmppManager manager = null;
-
- // A map of update listeners. There is one per remote domain we are sending updates to.
- // The name 'listener' refers to them listening for updates from the waveserver to send to the
- // network.
- private final LoadingCache<String, WaveletFederationListener> listeners;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class
- * is ready to use.
- *
- * @param waveletProvider used for communicating back to the Host part of the
- * wavelet server.
- * @param disco used for discovery
- */
- @Inject
- public XmppFederationHost(@FederationHostBridge WaveletFederationProvider waveletProvider,
- final XmppDisco disco, final Config config) {
- this.waveletProvider = waveletProvider;
- listeners = CacheBuilder.newBuilder().build(new CacheLoader<String, WaveletFederationListener>() {
- @Override
- public WaveletFederationListener load(@SuppressWarnings("NullableProblems") String domain) {
- return new XmppFederationHostForDomain(domain, manager, disco, config);
- }
- });
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- * @param manager the XmppManager object, used to send packets.
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Parse to a ProtocolHashedVersion from a given string version/base64-hash combination.
- *
- * @param startVersion the starting version
- * @param base64Hash the base64 hash
- * @throws IllegalArgumentException on bad data
- * @return a parsed protobuf object
- */
- private static ProtocolHashedVersion parseFromUnsafe(String startVersion, String base64Hash)
- throws IllegalArgumentException {
- return ProtocolHashedVersion.newBuilder()
- .setVersion(Long.parseLong(startVersion))
- .setHistoryHash(Base64Util.decode(base64Hash)).build();
- }
-
- /**
- * Reads a history request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the history request
- * @param responseCallback the callback to send the response back
- */
- void processHistoryRequest(final IQ request, final PacketCallback responseCallback) {
- Element items = null, historyDelta = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- if (pubsubRequest != null) {
- items = pubsubRequest.element("items");
- if (items != null) {
- historyDelta = items.element("delta-history");
- }
- }
- if (items == null || historyDelta == null
- || historyDelta.attribute("start-version") == null
- || historyDelta.attribute("start-version-hash") == null
- || historyDelta.attribute("end-version") == null
- || historyDelta.attribute("end-version-hash") == null
- || historyDelta.attribute("wavelet-name") == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed history request"));
- return;
- }
-
- final ProtocolHashedVersion startVersion;
- try {
- startVersion = parseFromUnsafe(historyDelta.attributeValue("start-version"),
- historyDelta.attributeValue("start-version-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid format of start version"));
- return;
- }
-
- final ProtocolHashedVersion endVersion;
- try {
- endVersion = parseFromUnsafe(historyDelta.attributeValue("end-version"),
- historyDelta.attributeValue("end-version-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid format of end version"));
- return;
- }
-
- final long responseLengthLimit;
- if (historyDelta.attribute("response-length-limit") != null) {
- try {
- responseLengthLimit = Long.parseLong(historyDelta.attributeValue("response-length-limit"));
- } catch (NumberFormatException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid response length limit"));
- return;
- }
- } else {
- responseLengthLimit = 0;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(historyDelta.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed wavelet name: " + historyDelta.attributeValue("wavelet-name")));
- return;
- }
-
- // Construct a new response listener inline.
- WaveletFederationProvider.HistoryResponseListener listener =
- new WaveletFederationProvider.HistoryResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(List<ByteString> appliedDeltaSet,
- ProtocolHashedVersion lastCommittedVersion, long versionTruncatedAt) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
-
- // Add each delta to the outgoing response.
- for (ByteString appliedDelta : appliedDeltaSet) {
- items.addElement("item").addElement("applied-delta",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addCDATA(
- Base64Util.encode(appliedDelta.toByteArray()));
- }
-
- // Set the LCV history-hash, if provided.
- // TODO(thorogood): We don't set the hashed version, which is wrong,
- // but it's not part of the current spec (Feb 2010).
- if (lastCommittedVersion != null && lastCommittedVersion.hasVersion()) {
- String version = String.valueOf(lastCommittedVersion.getVersion());
- items.addElement("item").addElement("commit-notice",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version);
- }
-
- // Set the version truncated at, if provided.
- if (versionTruncatedAt > 0) {
- String version = String.valueOf(versionTruncatedAt);
- items.addElement("item").addElement("history-truncated",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version);
- }
-
- // Send the message to the source.
- responseCallback.run(response);
- }
- };
-
- // Hand off a history request to the waveletProvider.
- // TODO(thorogood,arb): Note that the following remote domain is going to be
- // the Wave component JID (e.g. wave.foo.com), and *not* the actual remote domain.
- String remoteDomain = request.getFrom().getDomain();
- waveletProvider.requestHistory(waveletName, remoteDomain, startVersion,
- endVersion, responseLengthLimit, listener);
- }
-
- /**
- * Handles a submit request from a foreign wave remote. Sends it to the wave
- * server, sets up a callback to send the response.
- * @param request the submit request
- * @param responseCallback the callback to send the response back
- */
- void processSubmitRequest(final IQ request, final PacketCallback responseCallback) {
- Element item = null, submitRequest = null, deltaElement = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- // TODO: check for correct elements.
- Element publish = pubsubRequest.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- submitRequest = item.element("submit-request");
- if (submitRequest != null) {
- deltaElement = submitRequest.element("delta");
- }
- }
- }
- if (publish == null || item == null || submitRequest == null
- || deltaElement == null
- || deltaElement.attribute("wavelet-name") == null
- || deltaElement.getText() == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed submit request"));
- return;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(deltaElement.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed wavelet name: " + deltaElement.attributeValue("wavelet-name")));
- return;
- }
-
- final ProtocolSignedDelta delta;
- try {
- delta = ProtocolSignedDelta.parseFrom(Base64Util.decode(deltaElement.getText()));
- } catch (InvalidProtocolBufferException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed delta, not a valid protocol buffer"));
- return;
- }
-
- // Construct a submit result listener inline.
- WaveletFederationProvider.SubmitResultListener listener =
- new WaveletFederationProvider.SubmitResultListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(int operations, ProtocolHashedVersion version, long timestamp) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element submitResponse = pubsub.addElement("publish").addElement("item")
- .addElement("submit-response", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- submitResponse.addAttribute("application-timestamp", String.valueOf(timestamp));
- submitResponse.addAttribute("operations-applied", String.valueOf(operations));
-
- Element hashedVersion = submitResponse.addElement("hashed-version");
- hashedVersion.addAttribute("history-hash", Base64Util.encode(version.getHistoryHash()));
- hashedVersion.addAttribute("version", String.valueOf(version.getVersion()));
-
- responseCallback.run(response);
- }
- };
-
- // Hand off the submit request to the wavelet provider.
- waveletProvider.submitRequest(waveletName, delta, listener);
- }
-
- /**
- * Reads a get signer request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the get signer request
- * @param responseCallback the callback to send the response back
- */
- void processGetSignerRequest(final IQ request, final PacketCallback responseCallback) {
- Element items = request.getChildElement().element("items");
- Element signerRequest = items != null ? items.element("signer-request") : null;
-
- if (items == null || signerRequest == null
- || signerRequest.attributeValue("wavelet-name") == null
- || signerRequest.attributeValue("signer-id") == null
- || signerRequest.attributeValue("version") == null
- || signerRequest.attributeValue("history-hash") == null) {
- manager.sendErrorResponse(request, FederationErrors.badRequest("Malformed signer request"));
- return;
- }
-
- final ByteString signerId;
- try {
- signerId = Base64Util.decode(signerRequest.attributeValue("signer-id"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Malformed signer ID"));
- return;
- }
-
- final ProtocolHashedVersion deltaEndVersion;
- try {
- deltaEndVersion = parseFromUnsafe(signerRequest.attributeValue("version"),
- signerRequest.attributeValue("history-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid hashed version"));
- return;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(signerRequest.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest("Malformed wavelet name"));
- return;
- }
-
- WaveletFederationProvider.DeltaSignerInfoResponseListener listener =
- new WaveletFederationProvider.DeltaSignerInfoResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(ProtocolSignerInfo signerInfo) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- XmppUtil.protocolSignerInfoToXml(signerInfo, items);
-
- responseCallback.run(response);
- }
-
- };
-
- waveletProvider.getDeltaSignerInfo(signerId, waveletName, deltaEndVersion, listener);
- }
-
- /**
- * Reads a post signer request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the post signer request
- * @param responseCallback the callback to send the response back
- */
- void processPostSignerRequest(final IQ request, final PacketCallback responseCallback) {
- Element item = null, signatureElement = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- Element publish = pubsubRequest.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- signatureElement = item.element("signature");
- }
- }
-
- if (publish == null || item == null || signatureElement == null
- || signatureElement.attribute("domain") == null
- || signatureElement.attribute("algorithm") == null
- || signatureElement.element("certificate") == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed post signer request"));
- return;
- }
-
- ProtocolSignerInfo signer;
- try {
- signer = XmppUtil.xmlToProtocolSignerInfo(signatureElement);
- } catch (UnknownSignerType e) {
- responseCallback.error(FederationErrors.badRequest(
- "Could not understand signer algorithm: " + e));
- return;
- }
-
- WaveletFederationProvider.PostSignerInfoResponseListener listener =
- new WaveletFederationProvider.PostSignerInfoResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess() {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element item = pubsub.addElement("publish").addElement("item");
-
- item.addAttribute("node", "signer");
- item.addElement("signature-response", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- responseCallback.run(response);
- }
- };
-
- // TODO(thorogood,arb): This field is a Bad Idea; it could be faked and not
- // be a provider we host on this instance. Instead, we should infer from the
- // "To:" JID.
- String targetDomain = signatureElement.attributeValue("domain");
-
- // The first argument is the domain we intend to send this information to.
- waveletProvider.postSignerInfo(targetDomain, signer, listener);
- }
-
- @Override
- public WaveletFederationListener listenerForDomain(String domain) {
- try {
- // TODO(thorogood): Kick off disco here instead of inside
- // XmppFederationHostForDomain.
- return listeners.get(domain);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java
deleted file mode 100644
index d809b33..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * An instance of this class is created on demand for outgoing
- * messages to another wave Federation Remote. The wave server asks
- * the XmppFederationHost to create these.
- */
-class XmppFederationHostForDomain implements WaveletFederationListener {
-
- private static final Logger LOG =
- Logger.getLogger(XmppFederationHostForDomain.class.getCanonicalName());
-
- // Timeout for outstanding listener updates sent over XMPP.
- private static final int XMPP_LISTENER_TIMEOUT = 30;
-
- private final String remoteDomain;
- private final XmppManager manager;
- private final String jid;
- private final XmppDisco disco;
-
- @Inject
- public XmppFederationHostForDomain(final String domain, XmppManager manager,
- XmppDisco disco, Config config) {
- this.remoteDomain = domain;
- this.manager = manager;
- this.jid = config.getString("federation.xmpp_jid");
- this.disco = disco;
- }
-
- @Override
- public void waveletCommitUpdate(WaveletName waveletName, ProtocolHashedVersion committedVersion,
- WaveletUpdateCallback callback) {
- waveletUpdate(waveletName, null, committedVersion, callback);
- }
-
- @Override
- public void waveletDeltaUpdate(WaveletName waveletName, List<ByteString> appliedDeltas,
- WaveletUpdateCallback callback) {
- waveletUpdate(waveletName, appliedDeltas, null, callback);
- }
-
- /**
- * Sends a wavelet update message on behalf of the wave server. This
- * method just triggers a disco lookup (which may be cached) and
- * sets up a callback to call the real method that does the work.
- * This method may contain applied deltas, a commit notice, or both.
- *
- * @param waveletName the wavelet name
- * @param deltaList the deltas to include in the message, or null
- * @param committedVersion last committed version to include, or null
- * @param callback callback to invoke on delivery success/failure
- */
- public void waveletUpdate(final WaveletName waveletName, final List<ByteString> deltaList,
- final ProtocolHashedVersion committedVersion, final WaveletUpdateCallback callback) {
- if ((deltaList == null || deltaList.isEmpty()) && committedVersion == null) {
- throw new IllegalArgumentException("Must send at least one delta, or a last committed " +
- "version notice, for the target wavelet: " + waveletName);
- }
-
- disco.discoverRemoteJid(remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- internalWaveletUpdate(waveletName, deltaList, committedVersion, callback, remoteJid);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Disco failed for remote domain " + remoteDomain + ", update not sent");
- }
- callback.onFailure(FederationErrors.newFederationError(
- FederationError.Code.RESOURCE_CONSTRAINT, errorMessage));
- }
- });
- }
-
- /**
- * Sends a wavelet update message on behalf of the wave server once disco is
- * complete. This method may contain applied deltas, a commit notice, or both.
- *
- * @param waveletName the wavelet name
- * @param deltaList the deltas to include in the message, or null
- * @param committedVersion last committed version to include, or null
- * @param callback callback to invoke on delivery success/failure
- * @param remoteJid the remote JID to send the update to
- */
- private void internalWaveletUpdate(final WaveletName waveletName,
- final List<ByteString> deltaList, final ProtocolHashedVersion committedVersion,
- final WaveletUpdateCallback callback, String remoteJid) {
- Message message = new Message();
- message.setType(Message.Type.normal);
- message.setFrom(jid);
- message.setTo(remoteJid);
- message.setID(XmppUtil.generateUniqueId());
- message.addChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
-
- final String encodedWaveletName;
- try {
- encodedWaveletName = XmppUtil.waveletNameCodec.waveletNameToURI(waveletName);
- } catch (EncodingException e) {
- callback.onFailure(FederationErrors.badRequest("Bad wavelet name " + waveletName));
- return;
- }
-
- Element itemElement = message.addChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT)
- .addElement("items").addElement("item");
- if (deltaList != null) {
- for (ByteString delta : deltaList) {
- Element waveletUpdate =
- itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER)
- .addAttribute("wavelet-name", encodedWaveletName);
- waveletUpdate.addElement("applied-delta").addCDATA(Base64Util.encode(delta.toByteArray()));
- }
- }
- if (committedVersion != null) {
- Element waveletUpdate =
- itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER)
- .addAttribute("wavelet-name", encodedWaveletName);
- waveletUpdate.addElement("commit-notice").addAttribute("version",
- Long.toString(committedVersion.getVersion())).addAttribute("history-hash",
- Base64Util.encode(committedVersion.getHistoryHash()));
- }
-
- // Send the generated message through to the foreign XMPP server.
- manager.send(message, new PacketCallback() {
- @Override
- public void error(FederationError error) {
- callback.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- callback.onSuccess();
- }
- }, XMPP_LISTENER_TIMEOUT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java
deleted file mode 100644
index 0be6e94..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-
-import org.waveprotocol.wave.federation.FederationHostBridge;
-import org.waveprotocol.wave.federation.FederationRemoteBridge;
-import org.waveprotocol.wave.federation.FederationTransport;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-
-/**
- * Module for setting up an XMPP federation subsystem
- *
- * @author tad.glines@gmail.com (Tad Glines)
- */
-public class XmppFederationModule extends AbstractModule {
-
- @Override
- protected void configure() {
- // Request history and submit deltas to the outside world *from* our local
- // Wave Server.
- bind(WaveletFederationProvider.class).annotatedWith(FederationRemoteBridge.class).to(
- XmppFederationRemote.class).in(Singleton.class);
-
- // Serve updates to the outside world about local waves.
- bind(WaveletFederationListener.Factory.class).annotatedWith(FederationHostBridge.class).to(
- XmppFederationHost.class).in(Singleton.class);
-
- bind(XmppDisco.class).in(Singleton.class);
- bind(XmppFederationRemote.class).in(Singleton.class);
- bind(XmppFederationHost.class).in(Singleton.class);
-
- bind(XmppManager.class).in(Singleton.class);
- bind(IncomingPacketHandler.class).to(XmppManager.class);
- bind(ComponentPacketTransport.class).in(Singleton.class);
- bind(OutgoingPacketTransport.class).to(ComponentPacketTransport.class);
-
- bind(FederationTransport.class).to(XmppFederationTransport.class).in(Singleton.class);
- }
-
-}