You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/02/10 23:37:39 UTC
[1/2] cxf git commit: [CXF-6618] Prototyping a basic Spark streaming
demo
Repository: cxf
Updated Branches:
refs/heads/master c857aa32e -> c6fb2db07
[CXF-6618] Prototyping a basic Spark streaming demo
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e6f07226
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e6f07226
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e6f07226
Branch: refs/heads/master
Commit: e6f0722615190861192972a51b3986b34efd1f0e
Parents: 5f038c2
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Feb 10 22:36:55 2016 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Feb 10 22:36:55 2016 +0000
----------------------------------------------------------------------
.../main/release/samples/jax_rs/spark/pom.xml | 90 ++++++++++++
.../demo/jaxrs/server/InputStreamReceiver.java | 45 ++++++
.../src/main/java/demo/jaxrs/server/Server.java | 49 +++++++
.../demo/jaxrs/server/SparkStreamingOutput.java | 139 +++++++++++++++++++
.../demo/jaxrs/server/StreamingService.java | 116 ++++++++++++++++
5 files changed, 439 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
new file mode 100644
index 0000000..2971162
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>jax_rs_spark</artifactId>
+ <name>JAX-RS Spark Streaming Demo</name>
+ <description>JAX-RS Spark Streaming Demo</description>
+ <parent>
+ <groupId>org.apache.cxf.samples</groupId>
+ <artifactId>cxf-samples</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ <relativePath>../..</relativePath>
+ </parent>
+ <properties>
+ <cxf.version>${project.version}</cxf.version>
+ <httpclient.version>3.1</httpclient.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-jetty</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>1.6.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.7.0.Final</version>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>repo</id>
+ <url>http://mvnrepository.com/artifact</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <mainClass>demo.jaxrs.server.Server</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
new file mode 100644
index 0000000..de3ddf5
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
@@ -0,0 +1,45 @@
+package demo.jaxrs.server;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class InputStreamReceiver extends Receiver<String> {
+
+ private static final long serialVersionUID = 1L;
+ private List<String> inputStrings = new LinkedList<String>();
+
+ public InputStreamReceiver(InputStream is) {
+ super(StorageLevel.MEMORY_ONLY());
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String userInput = null;
+ while ((userInput = readLine(reader)) != null) {
+ inputStrings.add(userInput);
+ }
+ }
+ @Override
+ public void onStart() {
+ super.store(inputStrings.iterator());
+ }
+
+ private String readLine(BufferedReader reader) {
+ try {
+ return reader.readLine();
+ } catch (IOException ex) {
+ throw new WebApplicationException(500);
+ }
+ }
+ @Override
+ public void onStop() {
+ // complete
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
new file mode 100644
index 0000000..31a9d04
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package demo.jaxrs.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.spark.SparkConf;
+
+
+public class Server {
+
+ protected Server() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect");
+
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setResourceClasses(StreamingService.class);
+ sf.setResourceProvider(StreamingService.class,
+ new SingletonResourceProvider(new StreamingService(sparkConf)));
+ sf.setAddress("http://localhost:9000/");
+
+ sf.create();
+ }
+
+ public static void main(String args[]) throws Exception {
+ new Server();
+ Thread.sleep(60 * 60 * 1000);
+ System.out.println("Server ready...");
+ System.out.println("Server exiting");
+ System.exit(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
new file mode 100644
index 0000000..6be0594
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.scheduler.StreamingListener;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
+
+public class SparkStreamingOutput implements StreamingOutput {
+ private JavaPairDStream<String, Integer> wordCounts;
+ private JavaStreamingContext jssc;
+ private boolean sparkDone;
+ private boolean batchCompleted;
+ public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) {
+ this.jssc = jssc;
+ this.wordCounts = wordCounts;
+ }
+
+ @Override
+ public void write(final OutputStream output) throws IOException, WebApplicationException {
+ wordCounts.foreachRDD(new OutputFunction(output));
+ jssc.addStreamingListener(new SparkStreamingListener());
+ jssc.start();
+ awaitTermination();
+ jssc.stop(false);
+ jssc.close();
+ }
+
+ private synchronized void awaitTermination() {
+ while (!sparkDone) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ private synchronized void releaseStreamingContext() {
+ if (batchCompleted) {
+ sparkDone = true;
+ notify();
+ }
+ }
+
+ private synchronized void setBatchCompleted() {
+ batchCompleted = true;
+ }
+
+ private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+ private OutputStream os;
+ public OutputFunction(OutputStream os) {
+ this.os = os;
+ }
+ @Override
+ public void call(JavaPairRDD<String, Integer> rdd) {
+ for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
+ String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+ try {
+ os.write(value.getBytes());
+ os.flush();
+ } catch (IOException ex) {
+ throw new WebApplicationException();
+ }
+ }
+ releaseStreamingContext();
+ }
+
+ }
+ private class SparkStreamingListener implements StreamingListener {
+
+ @Override
+ public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+ setBatchCompleted();
+ }
+
+ @Override
+ public void onBatchStarted(StreamingListenerBatchStarted arg0) {
+ }
+
+ @Override
+ public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
+ }
+
+ @Override
+ public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted arg0) {
+ }
+
+ @Override
+ public void onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0) {
+ }
+
+ @Override
+ public void onReceiverError(StreamingListenerReceiverError arg0) {
+ }
+
+ @Override
+ public void onReceiverStarted(StreamingListenerReceiverStarted arg0) {
+ }
+
+ @Override
+ public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
new file mode 100644
index 0000000..4e2e8ff
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import scala.Tuple2;
+
+
+@Path("/")
+public class StreamingService {
+ private SparkConf sparkConf;
+ public StreamingService(SparkConf sparkConf) {
+ this.sparkConf = sparkConf;
+ }
+
+ @POST
+ @Path("/stream")
+ @Consumes("text/plain")
+ @Produces("text/plain")
+ public StreamingOutput getStream(InputStream is) {
+ try {
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+ JavaReceiverInputDStream<String> receiverStream =
+ jssc.receiverStream(new InputStreamReceiver(is));
+ return new SparkStreamingOutput(jssc,
+ createOutputDStream(receiverStream));
+ } catch (Exception ex) {
+ if (ex instanceof SparkException) {
+ // org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
+ // To ignore this error, set spark.driver.allowMultipleContexts = true
+ throw new WebApplicationException(Response.status(503).header("Retry-After", "60").build());
+ } else {
+ throw new WebApplicationException(ex);
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static JavaPairDStream<String, Integer> createOutputDStream(JavaReceiverInputDStream<String> receiverStream) {
+ final JavaDStream<String> words = receiverStream.flatMap(
+ new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Arrays.asList(x.split(" "));
+ }
+ });
+ final JavaPairDStream<String, Integer> pairs = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ });
+ return pairs.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+ }
+
+ //new MyReceiverInputDStream(jssc.ssc(),
+ // scala.reflect.ClassTag$.MODULE$.apply(String.class));
+// public static class MyReceiverInputDStream extends ReceiverInputDStream<String> {
+//
+// public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1) {
+// super(ssc_, evidence$1);
+// }
+//
+// @Override
+// public Receiver<String> getReceiver() {
+// return new InputStreamReceiver(is);
+// }
+//
+// }
+}
[2/2] cxf git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/cxf
Posted by se...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/cxf
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c6fb2db0
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c6fb2db0
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c6fb2db0
Branch: refs/heads/master
Commit: c6fb2db07a7ebc2bebe2220d6211f6fac357e603
Parents: e6f0722 c857aa3
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Feb 10 22:37:26 2016 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Feb 10 22:37:26 2016 +0000
----------------------------------------------------------------------
.../cxf/sts/operation/TokenIssueOperation.java | 20 +-
.../cxf/sts/rest/RESTSecurityTokenService.java | 11 +-
.../sts/rest/RESTSecurityTokenServiceImpl.java | 29 +-
.../sts/operation/IssueJWTRealmUnitTest.java | 34 +-
.../cxf/sts/operation/IssueJWTUnitTest.java | 10 +
.../cxf/systest/sts/rest/RESTUnitTest.java | 758 ------------
.../cxf/systest/sts/rest/STSRESTTest.java | 1079 ++++++++++++++++++
.../cxf/systest/sts/rest/cxf-rest-sts.xml | 1 +
8 files changed, 1136 insertions(+), 806 deletions(-)
----------------------------------------------------------------------