You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/06/01 11:33:13 UTC
[23/50] [abbrv] git commit: S4-123: Javadoc updates
S4-123: Javadoc updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/38c2d6d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/38c2d6d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/38c2d6d7
Branch: refs/heads/master
Commit: 38c2d6d72824c08df5911d86680ef9d8ebe306ed
Parents: faf0da4 66c81de
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Mar 10 11:27:06 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sun Mar 10 13:29:05 2013 +0100
----------------------------------------------------------------------
build.gradle | 15 +-
.../src/main/java/org/apache/s4/base/Event.java | 3 +
.../java/org/apache/s4/base/GenericKeyFinder.java | 8 +-
.../src/main/java/org/apache/s4/base/Key.java | 2 +-
.../main/java/org/apache/s4/base/KeyFinder.java | 3 +
.../main/java/org/apache/s4/base/package-info.java | 2 +-
.../java/org/apache/s4/base/util/S4RLoader.java | 4 +-
.../org/apache/s4/benchmark/dag/package-info.java | 22 ++
.../apache/s4/benchmark/prodcon/package-info.java | 22 ++
.../apache/s4/benchmark/utils/package-info.java | 22 ++
.../java/org/apache/s4/comm/DefaultCommModule.java | 3 -
.../org/apache/s4/comm/ModulesLoaderFactory.java | 4 +-
.../ThrottlingThreadPoolExecutorService.java | 3 -
.../org/apache/s4/comm/staging/package-info.java | 22 ++
.../apache/s4/comm/util/ArchiveFetchException.java | 14 -
.../org/apache/s4/comm/util/ArchiveFetcher.java | 42 ----
.../s4/comm/util/FileSystemArchiveFetcher.java | 41 ----
.../apache/s4/comm/util/HttpArchiveFetcher.java | 187 ---------------
.../org/apache/s4/comm/util/RemoteFileFetcher.java | 23 --
.../org/apache/s4/fixtures/TestCommModule.java | 4 -
.../main/java/org/apache/s4/core/AppModule.java | 13 +
.../main/java/org/apache/s4/core/BaseModule.java | 8 +-
.../java/org/apache/s4/core/DefaultCoreModule.java | 3 +-
.../org/apache/s4/core/DefaultRemoteSenders.java | 3 +
.../java/org/apache/s4/core/ProcessingElement.java | 4 +-
.../main/java/org/apache/s4/core/ReceiverImpl.java | 4 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 2 +-
.../main/java/org/apache/s4/core/S4Bootstrap.java | 4 +-
.../src/main/java/org/apache/s4/core/S4Node.java | 2 +-
.../main/java/org/apache/s4/core/SenderImpl.java | 8 +-
.../org/apache/s4/core/adapter/AdapterApp.java | 2 +
.../main/java/org/apache/s4/core/package-info.java | 4 +-
...ottlingRemoteSendersExecutorServiceFactory.java | 3 +-
.../org/apache/s4/core/staging/package-info.java | 25 ++
.../java/org/apache/s4/core/util/AppConfig.java | 5 +
.../apache/s4/core/util/ArchiveFetchException.java | 16 ++
.../org/apache/s4/core/util/ArchiveFetcher.java | 42 ++++
.../s4/core/util/FileSystemArchiveFetcher.java | 41 ++++
.../apache/s4/core/util/HttpArchiveFetcher.java | 187 +++++++++++++++
.../org/apache/s4/core/util/RemoteFileFetcher.java | 23 ++
.../java/org/apache/s4/core/util/S4Metrics.java | 4 +
.../s4/core/window/AbstractSlidingWindowPE.java | 18 ++-
.../java/org/apache/s4/core/window/OHCLSlot.java | 79 ------
.../java/org/apache/s4/core/window/OHLCSlot.java | 82 +++++++
.../org/apache/s4/core/window/package-info.java | 5 +-
.../java/org/apache/s4/deploy/DeploymentUtils.java | 15 ++
.../java/org/apache/s4/deploy/package-info.java | 3 +-
.../main/java/org/apache/s4/tools/CreateApp.java | 3 +
.../java/org/apache/s4/tools/DefineCluster.java | 3 +
.../src/main/java/org/apache/s4/tools/Deploy.java | 7 +-
.../org/apache/s4/tools/FileExistsValidator.java | 3 +
.../src/main/java/org/apache/s4/tools/Package.java | 6 +-
.../main/java/org/apache/s4/tools/S4ArgsBase.java | 3 +
.../src/main/java/org/apache/s4/tools/Status.java | 6 +-
.../src/main/java/org/apache/s4/tools/Tools.java | 3 +
.../main/java/org/apache/s4/tools/ZKServer.java | 4 +
56 files changed, 652 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/build.gradle
----------------------------------------------------------------------
diff --cc build.gradle
index 3193075,3193075..3025030
--- a/build.gradle
+++ b/build.gradle
@@@ -187,15 -187,15 +187,18 @@@ dependencies
task javadoc (type: Javadoc) {
destinationDir = new File(project.rootDir, 'doc/javadoc')
destinationDir.deleteDir()
--
-- title = "Apache S4 " + version
-- source platformProjects.collect { project ->
-- project.sourceSets.main.allJava
++ excludes = ['org.apache.s4.benchmark']
++ project.ext["documentedProjects"] = new HashSet(platformProjects)
++ documentedProjects.remove(project(':s4-benchmarks'))
++ title = "Apache S4 " + version
++ source documentedProjects.collect { project ->
++ project.sourceSets.main.allJava
}
// Might need a classpath
-- classpath = files(subprojects.collect { project ->
-- project.sourceSets.main.compileClasspath
++ classpath = files(documentedProjects.collect { project ->
++ project.sourceSets.main.compileClasspath
})
++
}
// TODO parameterize
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index bad2555,bad2555..40c4779
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@@ -61,9 -61,9 +61,6 @@@ public class DefaultCommModule extends
*
* @param commConfigInputStream
* input stream from a configuration file
-- * @param clusterName
-- * the name of the cluster to which the current node belongs. If specified in the configuration file,
-- * this parameter will be ignored.
*/
public DefaultCommModule(InputStream commConfigInputStream) {
super();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
index 2b89039,2b89039..5023e25
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
@@@ -38,8 -38,8 +38,8 @@@ public class ModulesLoaderFactory
*
* Inspired from Hadoop's application classloading implementation (RunJar class).
*
-- * @param modulesJarPath
-- * path to s4r
++ * @param modulesFiles
++ * files containing modules classes
* @return classloader that loads resources from the archive in a predefined order
*/
public ModulesLoader createModulesLoader(Iterable<File> modulesFiles) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
index 0b10590,0b10590..7e7b1d0
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
@@@ -42,9 -42,9 +42,6 @@@ public class ThrottlingThreadPoolExecut
*
* @param parallelism
* Maximum number of threads in the pool
-- * @param fairParallelism
-- * If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
-- * can help ensure ordering, though there is an associated performance cost (typically small).
* @param threadName
* Naming scheme
* @param workQueueSize
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
index 8567cce,8567cce..0000000
deleted file mode 100644,100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
+++ /dev/null
@@@ -1,14 -1,14 +1,0 @@@
--package org.apache.s4.comm.util;
--
--
--public class ArchiveFetchException extends Exception {
--
-- public ArchiveFetchException(String string) {
-- super(string);
-- }
--
-- public ArchiveFetchException(String string, Throwable throwable) {
-- super(string, throwable);
-- }
--
--}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
index dbd35ba,dbd35ba..0000000
deleted file mode 100644,100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
+++ /dev/null
@@@ -1,42 -1,42 +1,0 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.s4.comm.util;
--
--import java.io.InputStream;
--import java.net.URI;
--
--/**
-- * This interface defines methods to fetch archive files from a URI (S4R or modules jars). Various protocols can be
-- * supported in the implementation classes (e.g. file system, HTTP etc...)
-- *
-- */
--public interface ArchiveFetcher {
--
-- /**
-- * Returns a stream to an archive file
-- *
-- * @param uri
-- * archive identifier
-- * @return an input stream for accessing the content of the archive file
-- * @throws ArchiveFetchException
-- * when fetching fails
-- */
-- InputStream fetch(URI uri) throws ArchiveFetchException;
--
--}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
index 294877f,294877f..0000000
deleted file mode 100644,100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
+++ /dev/null
@@@ -1,41 -1,41 +1,0 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.s4.comm.util;
--
--import java.io.File;
--import java.io.FileInputStream;
--import java.io.FileNotFoundException;
--import java.io.InputStream;
--import java.net.URI;
--
--/**
-- * Fetches modules jar files and application S4R files from a file system, possibly distributed.
-- *
-- */
--public class FileSystemArchiveFetcher implements ArchiveFetcher {
--
-- @Override
-- public InputStream fetch(URI uri) throws ArchiveFetchException {
-- try {
-- return new FileInputStream(new File(uri));
-- } catch (FileNotFoundException e) {
-- throw new ArchiveFetchException("Cannot retrieve file from uri [" + uri.toString() + "]");
-- }
-- }
--}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
index 39b98d9,39b98d9..0000000
deleted file mode 100644,100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
+++ /dev/null
@@@ -1,187 -1,187 +1,0 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.s4.comm.util;
--
--import java.io.File;
--import java.io.FileInputStream;
--import java.io.FileNotFoundException;
--import java.io.FileOutputStream;
--import java.io.IOException;
--import java.io.InputStream;
--import java.net.InetSocketAddress;
--import java.net.URI;
--import java.util.concurrent.Executors;
--
--import org.jboss.netty.bootstrap.ClientBootstrap;
--import org.jboss.netty.buffer.ChannelBuffer;
--import org.jboss.netty.buffer.ChannelBufferInputStream;
--import org.jboss.netty.channel.Channel;
--import org.jboss.netty.channel.ChannelFuture;
--import org.jboss.netty.channel.ChannelHandlerContext;
--import org.jboss.netty.channel.ChannelPipeline;
--import org.jboss.netty.channel.ChannelPipelineFactory;
--import org.jboss.netty.channel.Channels;
--import org.jboss.netty.channel.MessageEvent;
--import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
--import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
--import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
--import org.jboss.netty.handler.codec.http.HttpChunk;
--import org.jboss.netty.handler.codec.http.HttpClientCodec;
--import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
--import org.jboss.netty.handler.codec.http.HttpHeaders;
--import org.jboss.netty.handler.codec.http.HttpMethod;
--import org.jboss.netty.handler.codec.http.HttpRequest;
--import org.jboss.netty.handler.codec.http.HttpResponse;
--import org.jboss.netty.handler.codec.http.HttpVersion;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import com.google.common.io.ByteStreams;
--
--/**
-- * <p>
-- * Fetches modules and app archives through HTTP.
-- * </p>
-- * <p>
-- * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
-- *
-- * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
-- * snoop example</a>
-- *
-- * </p>
-- */
--public class HttpArchiveFetcher implements ArchiveFetcher {
--
-- private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
--
-- @Override
-- public InputStream fetch(URI uri) throws ArchiveFetchException {
-- logger.debug("Fetching file through http: {}", uri.toString());
--
-- String host = uri.getHost();
-- int port = uri.getPort();
-- if (port == -1) {
-- if (uri.getScheme().equalsIgnoreCase("http")) {
-- port = 80;
-- } else if (uri.getScheme().equalsIgnoreCase("https")) {
-- port = 443;
-- }
-- }
--
-- ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
-- Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-- File tmpFile;
-- try {
-- tmpFile = File.createTempFile("http", "download");
-- } catch (IOException e) {
-- throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
-- e);
-- }
-- clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
-- ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
-- // TODO timeout?
-- Channel channel = channelFuture.awaitUninterruptibly().getChannel();
-- if (!channelFuture.isSuccess()) {
-- clientBootstrap.releaseExternalResources();
-- throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
-- channelFuture.getCause());
-- }
--
-- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath());
-- request.setHeader(HttpHeaders.Names.HOST, host);
-- request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-- request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
--
-- channel.write(request);
--
-- channel.getCloseFuture().awaitUninterruptibly();
--
-- clientBootstrap.releaseExternalResources();
--
-- logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
-- tmpFile.getAbsolutePath());
-- try {
-- return new FileInputStream(tmpFile);
-- } catch (FileNotFoundException e) {
-- throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
-- + tmpFile.getAbsolutePath() + "]");
-- }
-- }
--
-- private class HttpClientPipelineFactory implements ChannelPipelineFactory {
--
-- File tmpFile;
--
-- public HttpClientPipelineFactory(File tmpFile) {
-- this.tmpFile = tmpFile;
-- }
--
-- @Override
-- public ChannelPipeline getPipeline() throws Exception {
-- // Create a default pipeline implementation.
-- ChannelPipeline pipeline = Channels.pipeline();
--
-- pipeline.addLast("codec", new HttpClientCodec());
--
-- // Remove the following line if you don't want automatic content decompression.
-- pipeline.addLast("inflater", new HttpContentDecompressor());
--
-- pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
-- return pipeline;
-- }
-- }
--
-- // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
-- private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
--
-- private boolean readingChunks;
-- FileOutputStream fos;
--
-- public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
-- this.fos = new FileOutputStream(tmpFile);
-- }
--
-- @Override
-- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-- if (!readingChunks) {
-- HttpResponse response = (HttpResponse) e.getMessage();
--
-- if (response.isChunked()) {
-- readingChunks = true;
-- } else {
-- copyContentToTmpFile(response.getContent());
-- }
-- } else {
-- HttpChunk chunk = (HttpChunk) e.getMessage();
-- if (chunk.isLast()) {
-- readingChunks = false;
-- fos.close();
-- } else {
-- copyContentToTmpFile(chunk.getContent());
-- }
-- }
--
-- }
--
-- private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
-- ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
-- ByteStreams.copy(cbis, fos);
-- }
-- }
--}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
index f81b81b,f81b81b..0000000
deleted file mode 100644,100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
+++ /dev/null
@@@ -1,23 -1,23 +1,0 @@@
--package org.apache.s4.comm.util;
--
--import java.io.InputStream;
--import java.net.URI;
--
--/**
-- * Factory for remote file fetchers depending on the access protocol.
-- *
-- */
--public class RemoteFileFetcher implements ArchiveFetcher {
--
-- @Override
-- public InputStream fetch(URI uri) throws ArchiveFetchException {
-- String scheme = uri.getScheme();
-- if ("file".equalsIgnoreCase(scheme)) {
-- return new FileSystemArchiveFetcher().fetch(uri);
-- }
-- if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
-- return new HttpArchiveFetcher().fetch(uri);
-- }
-- throw new ArchiveFetchException("Unsupported protocol " + scheme);
-- }
--}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index 827ec42,827ec42..55dcaec
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@@ -7,8 -7,8 +7,6 @@@ import org.apache.s4.comm.topology.Assi
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.topology.ZkClient;
--import org.apache.s4.comm.util.ArchiveFetcher;
--import org.apache.s4.comm.util.RemoteFileFetcher;
import com.google.inject.name.Names;
@@@ -34,8 -34,8 +32,6 @@@ public class TestCommModule extends Def
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
-- bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
--
ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
zkClient.setZkSerializer(new ZNRecordSerializer());
bind(ZkClient.class).toInstance(zkClient);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 6eda37d,e0bd0e0..4468af7
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@@ -9,8 -9,8 +9,8 @@@ import org.apache.commons.configuration
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.ZkClient;
--import org.apache.s4.comm.util.ArchiveFetcher;
--import org.apache.s4.comm.util.RemoteFileFetcher;
++import org.apache.s4.core.util.ArchiveFetcher;
++import org.apache.s4.core.util.RemoteFileFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 757ce4f,757ce4f..e36c86e
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@@ -22,9 -22,9 +22,9 @@@ import org.apache.s4.comm.DefaultCommMo
import org.apache.s4.comm.ModulesLoaderFactory;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZkClient;
--import org.apache.s4.comm.util.ArchiveFetchException;
--import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.core.util.AppConfig;
++import org.apache.s4.core.util.ArchiveFetchException;
++import org.apache.s4.core.util.ArchiveFetcher;
import org.apache.s4.core.util.ParametersInjectionModule;
import org.apache.s4.deploy.DeploymentFailedException;
import org.apache.zookeeper.CreateMode;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index 907fcb9,907fcb9..f298cde
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@@ -3,7 -3,7 +3,7 @@@ package org.apache.s4.core
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
--import org.apache.s4.comm.util.ArchiveFetchException;
++import org.apache.s4.core.util.ArchiveFetchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetchException.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetchException.java
index 0000000,0000000..075c703
new file mode 100644
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetchException.java
@@@ -1,0 -1,0 +1,16 @@@
++package org.apache.s4.core.util;
++
++/**
++ * Exception thrown when an archive cannot be fetched correctly
++ */
++public class ArchiveFetchException extends Exception {
++
++ public ArchiveFetchException(String string) {
++ super(string);
++ }
++
++ public ArchiveFetchException(String string, Throwable throwable) {
++ super(string, throwable);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetcher.java
index 0000000,0000000..d6071ff
new file mode 100644
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ArchiveFetcher.java
@@@ -1,0 -1,0 +1,42 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.s4.core.util;
++
++import java.io.InputStream;
++import java.net.URI;
++
++/**
++ * This interface defines methods to fetch archive files from a URI (S4R or modules jars). Various protocols can be
++ * supported in the implementation classes (e.g. file system, HTTP etc...)
++ *
++ */
++public interface ArchiveFetcher {
++
++ /**
++ * Returns a stream to an archive file
++ *
++ * @param uri
++ * archive identifier
++ * @return an input stream for accessing the content of the archive file
++ * @throws ArchiveFetchException
++ * when fetching fails
++ */
++ InputStream fetch(URI uri) throws ArchiveFetchException;
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/FileSystemArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/util/FileSystemArchiveFetcher.java
index 0000000,0000000..a975919
new file mode 100644
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/FileSystemArchiveFetcher.java
@@@ -1,0 -1,0 +1,41 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.s4.core.util;
++
++import java.io.File;
++import java.io.FileInputStream;
++import java.io.FileNotFoundException;
++import java.io.InputStream;
++import java.net.URI;
++
++/**
++ * Fetches modules jar files and application S4R files from a file system, possibly distributed.
++ *
++ */
++public class FileSystemArchiveFetcher implements ArchiveFetcher {
++
++ @Override
++ public InputStream fetch(URI uri) throws ArchiveFetchException {
++ try {
++ return new FileInputStream(new File(uri));
++ } catch (FileNotFoundException e) {
++ throw new ArchiveFetchException("Cannot retrieve file from uri [" + uri.toString() + "]");
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/util/HttpArchiveFetcher.java
index 0000000,0000000..9d3fa1e
new file mode 100644
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/HttpArchiveFetcher.java
@@@ -1,0 -1,0 +1,187 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.s4.core.util;
++
++import java.io.File;
++import java.io.FileInputStream;
++import java.io.FileNotFoundException;
++import java.io.FileOutputStream;
++import java.io.IOException;
++import java.io.InputStream;
++import java.net.InetSocketAddress;
++import java.net.URI;
++import java.util.concurrent.Executors;
++
++import org.jboss.netty.bootstrap.ClientBootstrap;
++import org.jboss.netty.buffer.ChannelBuffer;
++import org.jboss.netty.buffer.ChannelBufferInputStream;
++import org.jboss.netty.channel.Channel;
++import org.jboss.netty.channel.ChannelFuture;
++import org.jboss.netty.channel.ChannelHandlerContext;
++import org.jboss.netty.channel.ChannelPipeline;
++import org.jboss.netty.channel.ChannelPipelineFactory;
++import org.jboss.netty.channel.Channels;
++import org.jboss.netty.channel.MessageEvent;
++import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
++import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
++import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
++import org.jboss.netty.handler.codec.http.HttpChunk;
++import org.jboss.netty.handler.codec.http.HttpClientCodec;
++import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
++import org.jboss.netty.handler.codec.http.HttpHeaders;
++import org.jboss.netty.handler.codec.http.HttpMethod;
++import org.jboss.netty.handler.codec.http.HttpRequest;
++import org.jboss.netty.handler.codec.http.HttpResponse;
++import org.jboss.netty.handler.codec.http.HttpVersion;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import com.google.common.io.ByteStreams;
++
++/**
++ * <p>
++ * Fetches modules and app archives through HTTP.
++ * </p>
++ * <p>
++ * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
++ *
++ * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
++ * snoop example</a>
++ *
++ * </p>
++ */
++public class HttpArchiveFetcher implements ArchiveFetcher {
++
++ private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
++
++ @Override
++ public InputStream fetch(URI uri) throws ArchiveFetchException {
++ logger.debug("Fetching file through http: {}", uri.toString());
++
++ String host = uri.getHost();
++ int port = uri.getPort();
++ if (port == -1) {
++ if (uri.getScheme().equalsIgnoreCase("http")) {
++ port = 80;
++ } else if (uri.getScheme().equalsIgnoreCase("https")) {
++ port = 443;
++ }
++ }
++
++ ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
++ Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
++ File tmpFile;
++ try {
++ tmpFile = File.createTempFile("http", "download");
++ } catch (IOException e) {
++ throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
++ e);
++ }
++ clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
++ ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
++ // TODO timeout?
++ Channel channel = channelFuture.awaitUninterruptibly().getChannel();
++ if (!channelFuture.isSuccess()) {
++ clientBootstrap.releaseExternalResources();
++ throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
++ channelFuture.getCause());
++ }
++
++ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath());
++ request.setHeader(HttpHeaders.Names.HOST, host);
++ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
++ request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
++
++ channel.write(request);
++
++ channel.getCloseFuture().awaitUninterruptibly();
++
++ clientBootstrap.releaseExternalResources();
++
++ logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
++ tmpFile.getAbsolutePath());
++ try {
++ return new FileInputStream(tmpFile);
++ } catch (FileNotFoundException e) {
++ throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
++ + tmpFile.getAbsolutePath() + "]");
++ }
++ }
++
++ private class HttpClientPipelineFactory implements ChannelPipelineFactory {
++
++ File tmpFile;
++
++ public HttpClientPipelineFactory(File tmpFile) {
++ this.tmpFile = tmpFile;
++ }
++
++ @Override
++ public ChannelPipeline getPipeline() throws Exception {
++ // Create a default pipeline implementation.
++ ChannelPipeline pipeline = Channels.pipeline();
++
++ pipeline.addLast("codec", new HttpClientCodec());
++
++ // Remove the following line if you don't want automatic content decompression.
++ pipeline.addLast("inflater", new HttpContentDecompressor());
++
++ pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
++ return pipeline;
++ }
++ }
++
++ // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
++ private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
++
++ private boolean readingChunks;
++ FileOutputStream fos;
++
++ public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
++ this.fos = new FileOutputStream(tmpFile);
++ }
++
++ @Override
++ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
++ if (!readingChunks) {
++ HttpResponse response = (HttpResponse) e.getMessage();
++
++ if (response.isChunked()) {
++ readingChunks = true;
++ } else {
++ copyContentToTmpFile(response.getContent());
++ }
++ } else {
++ HttpChunk chunk = (HttpChunk) e.getMessage();
++ if (chunk.isLast()) {
++ readingChunks = false;
++ fos.close();
++ } else {
++ copyContentToTmpFile(chunk.getContent());
++ }
++ }
++
++ }
++
++ private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
++ ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
++ ByteStreams.copy(cbis, fos);
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/RemoteFileFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/util/RemoteFileFetcher.java
index 0000000,0000000..44707a3
new file mode 100644
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/RemoteFileFetcher.java
@@@ -1,0 -1,0 +1,23 @@@
++package org.apache.s4.core.util;
++
++import java.io.InputStream;
++import java.net.URI;
++
++/**
++ * Factory for remote file fetchers depending on the access protocol.
++ *
++ */
++public class RemoteFileFetcher implements ArchiveFetcher {
++
++ @Override
++ public InputStream fetch(URI uri) throws ArchiveFetchException {
++ String scheme = uri.getScheme();
++ if ("file".equalsIgnoreCase(scheme)) {
++ return new FileSystemArchiveFetcher().fetch(uri);
++ }
++ if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
++ return new HttpArchiveFetcher().fetch(uri);
++ }
++ throw new ArchiveFetchException("Unsupported protocol " + scheme);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
index 3c3cd67,e0ec20f..7688c9d
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
@@@ -148,6 -157,9 +157,8 @@@ public abstract class AbstractSlidingWi
* User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
* slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
* safe.
+ *
- * @param result
- * result of evaluation
++ * @return result of evaluation
*/
abstract protected V evaluateWindow(Collection<T> slots);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 2165eca,c255bcc..f78b866
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@@ -40,10 -40,15 +40,12 @@@ import org.slf4j.LoggerFactory
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.FileConverter;
import com.beust.jcommander.internal.Maps;
import com.google.common.base.Strings;
- import com.google.common.io.Files;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
+ /**
+ * Deploys and S4 application configuration into the cluster manager
+ */
public class Deploy extends S4ArgsBase {
static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Package.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/main/java/org/apache/s4/tools/Package.java
index 084b6e9,c0f3518..f759d26
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Package.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Package.java
@@@ -40,9 -43,10 +43,8 @@@ public class Package extends S4ArgsBas
// prepare gradle -P parameters, including passed gradle opts
params.add("appClass=" + packageArgs.appClass);
params.add("appName=" + packageArgs.appName.get(0));
- ExecGradle.exec(packageArgs.gradleBuildFile, "s4r", params.toArray(new String[] {}),
- packageArgs.debug);
+ ExecGradle.exec(packageArgs.gradleBuildFile, "s4r", params.toArray(new String[] {}), packageArgs.debug);
- // Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
- System.exit(0);
} catch (Exception e) {
LoggerFactory.getLogger(Package.class).error("Cannot deploy app", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/38c2d6d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 1d9bd45,418f01b..b109259
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@@ -227,7 -231,7 +231,7 @@@ public class Status extends S4ArgsBase
* cluster list
* @param clusterAppMap
* <cluster,app>
-- * @return
++ * @return formatted string
*/
private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
if (clusters == null || clusters.size() == 0) {