You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Jennifer Coston <Je...@raytheon.com> on 2015/12/16 17:45:58 UTC

Error when trying to integrate Apache Ignite and Spark Streaming


Hello,

I am trying to integrate Ignite into an existing Spark Streaming project
written in Java that counts the words in a text file. I have added both
dependencies for ignite-spark and spark-streaming to my POM:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>wordCount_Spark</groupId>
	<artifactId>wordCount_Spark</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>wordCount_Spark</name>
	<description>Count words using spark</description>

	<dependencies>
	<!-- Apache Ignite dependencies -->
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-core</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-indexing</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-log4j</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spark</artifactId>
			<version>1.4.0</version>
		</dependency>
		<!-- Spark Streaming dependencies -->
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.5.2</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.0</version>
			<!--  <scope>test</scope> -->
		</dependency>
	</dependencies>

 	<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.5.1</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<useIncrementalCompilation>false</
useIncrementalCompilation>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Here is my Junit test:

package wordCount_test;

import static org.junit.Assert.*;

import java.io.File;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import wordCount.SparkWordCount;

public class TestSparkWordCount {

	JavaSparkContext jsc;
	File txtFile;

	@Before
	public void setUp() throws Exception {
		jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
		txtFile = new File("AIW_WordCount");
		if(txtFile.exists()){
			txtFile.delete();
		}

	}

	@After
	public void tearDown() throws Exception {
		if(jsc != null){
			jsc.stop();
			jsc = null;
		}
	}

	@Test
	public void testSparkInit() {
		assertNotNull(jsc.sc());
	}
}

But I'm seeing the following error when I try to run the test.

java.lang.ClassNotFoundException:
org.spark_project.protobuf.GeneratedMessage
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:270)
	at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply
(DynamicAccess.scala:67)
	at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply
(DynamicAccess.scala:66)
	at scala.util.Try$.apply(Try.scala:191)
	at akka.actor.ReflectiveDynamicAccess.getClassFor
(DynamicAccess.scala:66)
	at akka.serialization.Serialization$$anonfun$6.apply
(Serialization.scala:181)
	at akka.serialization.Serialization$$anonfun$6.apply
(Serialization.scala:181)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply
(TraversableLike.scala:728)
	at scala.collection.immutable.HashMap$HashMap1.foreach
(HashMap.scala:221)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach
(HashMap.scala:428)
	at scala.collection.TraversableLike$WithFilter.map
(TraversableLike.scala:727)
	at akka.serialization.Serialization.<init>(Serialization.scala:181)
	at akka.serialization.SerializationExtension$.createExtension
(SerializationExtension.scala:15)
	at akka.serialization.SerializationExtension$.createExtension
(SerializationExtension.scala:12)
	at akka.actor.ActorSystemImpl.registerExtension
(ActorSystem.scala:711)
	at akka.actor.ExtensionId$class.apply(Extension.scala:79)
	at akka.serialization.SerializationExtension$.apply
(SerializationExtension.scala:12)
	at akka.remote.RemoteActorRefProvider.init
(RemoteActorRefProvider.scala:175)
	at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
	at akka.actor.ActorSystemImpl._start$lzycompute
(ActorSystem.scala:615)
	at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
	at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
	at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$
$doCreateActorSystem(AkkaUtils.scala:122)
	at org.apache.spark.util.AkkaUtils$$anonfun$1.apply
(AkkaUtils.scala:55)
	at org.apache.spark.util.AkkaUtils$$anonfun$1.apply
(AkkaUtils.scala:54)
	at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply
$mcVI$sp(Utils.scala:1837)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
	at org.apache.spark.util.AkkaUtils$.createActorSystem
(AkkaUtils.scala:57)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
	at org.apache.spark.SparkContext.createSparkEnv
(SparkContext.scala:269)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:272)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:154)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:169)
	at org.apache.spark.api.java.JavaSparkContext.<init>
(JavaSparkContext.scala:67)
	at wordCount_test.TestSparkWordCount.setUp
(TestSparkWordCount.java:25)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall
(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run
(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively
(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.RunBefores.evaluate
(RunBefores.java:24)
	at org.junit.internal.runners.statements.RunAfters.evaluate
(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild
(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild
(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run
(JUnit4TestReference.java:50)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run
(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests
(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests
(RemoteTestRunner.java:675)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run
(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main
(RemoteTestRunner.java:192)


If I remove the ignite-spark dependency this test works. Any help or
guidance you can provide would be greatly appreciated.

Thank you,
Jennifer

Re: Error when trying to integrate Apache Ignite and Spark Streaming

Posted by vkulichenko <va...@gmail.com>.
Jeniffer,

You can find example configurations here:
https://github.com/apache/ignite/tree/master/examples/config

The actual content depends on what discovery mechanism you want to use,
whether you need events, caches, etc...

If you want to start with all defaults, you can use Ignite's default
configuration (IGNITE_HOME environment variable should be set):

JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc,
"config/default-config.xml");

Let us know if it helps.

-Val





--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2270.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Error when trying to integrate Apache Ignite and Spark Streaming

Posted by jcoston <je...@raytheon.com>.
Val,

Do you happen to have an example of what needs to be in the Spring config
file?

Thanks,
-Jennifer



From:	"vkulichenko [via Apache Ignite Users]" <ml-node
            +s70518n2251h27@n6.nabble.com>
To:	jcoston <je...@raytheon.com>
Date:	12/17/2015 06:38 PM
Subject:	Re: Error when trying to integrate Apache Ignite and Spark
            Streaming



Hi Jennifer,

IgniteConfigProvider is just an implementation of IgniteOutClosure used in
tests. If you're using binary distribution that doesn't have tests, you
don't have this class as well.

Second parameter is a URL to configuration XML file, or implementation of
IgniteOutClosure that should act like a factory and create new instance of
IgniteConfiguration. The latter should be used if you want to provide
configuration in code rather than in separate Spring-based file.

-Val


If you reply to this email, your message will be added to the discussion
below:
http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2251.html

To unsubscribe from Error when trying to integrate Apache Ignite and Spark
Streaming, click here.
NAML

graycol.gif (144 bytes) <http://apache-ignite-users.70518.x6.nabble.com/attachment/2258/0/graycol.gif>




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2258.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Error when trying to integrate Apache Ignite and Spark Streaming

Posted by vkulichenko <va...@gmail.com>.
Hi Jennifer,

IgniteConfigProvider is just an implementation of IgniteOutClosure used in
tests. If you're using binary distribution that doesn't have tests, you
don't have this class as well.

Second parameter is a URL to configuration XML file, or implementation of
IgniteOutClosure that should act like a factory and create new instance of
IgniteConfiguration. The latter should be used if you want to provide
configuration in code rather than in separate Spring-based file.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2251.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Error when trying to integrate Apache Ignite and Spark Streaming

Posted by jcoston <je...@raytheon.com>.
Thank you Val! That fixed it. 

I have one more question for you. I am trying to initialize a
JavaIgniteContext in my test case. All of the examples I found online have
the following:

public void testStoreDataToIgnite() throws Exception { 
        JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); 
        JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc,
new IgniteConfigProvider()); 

 
However, when I pass in IgniteConfigProvider to the JavaIgniteContext
eclipse is giving me an error that says IgniteConfigProvider cannot be
resolved to a type. From the code it appears that the method is looking for
an IgniteOutClosure or a string. What value should I be passing in as the
second parameter when I initialize the JavaIgniteContext?

P.S. If there is an API somewhere on JavaIgniteContexts and JavaIgniteRDDs
that you can direct me to that would be very helpful.

-Jennifer



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2248.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Error when trying to integrate Apache Ignite and Spark Streaming

Posted by vkulichenko <va...@gmail.com>.
Hi Jennifer,

I responded on the SO:
http://stackoverflow.com/questions/34320932/class-not-found-error-when-integrating-spark-streaming-and-apache-ignite

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Error-when-trying-to-integrate-Apache-Ignite-and-Spark-Streaming-tp2235p2236.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.