You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Jennifer Coston <Je...@raytheon.com> on 2015/12/16 15:27:32 UTC

Ignite and Spark Streaming Integration Using Java

I am trying to understand the integration of Apache Spark Streaming and
Apache Ignite but I’m running into some errors. I’m not sure where I went
wrong, so I am going to walk through all of my steps. I have worked through
two word counting tutorials provided by Apache. In the first tutorial I
created a Java project that uses Ignite to count the words in a document on
a sliding window of 5 seconds and then query the cache to determine the
most common words. I believe it created a structure that looks like this:






In the second tutorial, I created a Java project that counts the words in a
document on a rolling window of 5 seconds and saves the output to a text
document. I believe this tutorial created a structure that looks like this:



Now comes the hard part, trying to integrate the two. Based on the
documentation online, I believe the resulting combination will create a
structure that looks like this:





I started with updating the POM file by adding the dependencies for both
Spark Streaming and Ignite. However, the code doesn’t compile if you have
the dependencies for both of them.
                                                                                 
                                                                                 
                <dependency>                                                     
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spark</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
                <dependency>                                                     
    			<groupId>org.apache.spark</groupId>                           
    			<artifactId>spark-streaming_2.10</artifactId>                 
    			<version>1.5.2</version>                                      
    		</dependency>                                                       
                                                                                 





Here is the full file
                                                                                 
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=                
    "http://www.w3.org/2001/XMLSchema-instance"                                  
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                     
    http://maven.apache.org/xsd/maven-4.0.0.xsd">                                
    	<modelVersion>4.0.0</modelVersion>                                        
    	<groupId>wordCount_Spark</groupId>                                        
    	<artifactId>wordCount_Spark</artifactId>                                  
    	<version>0.0.1-SNAPSHOT</version>                                         
    	<name>wordCount_Spark</name>                                              
    	<description>Count words using spark</description>                        
                                                                                 
    	<dependencies>                                                            
    	<!-- Apache Ignite dependencies -->                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-core</artifactId>                          
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spring</artifactId>                        
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-indexing</artifactId>                      
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-log4j</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
    		<dependency>                                                        
    			<groupId>org.apache.ignite</groupId>                          
    			<artifactId>ignite-spark</artifactId>                         
    			<version>1.4.0</version>                                      
    		</dependency>                                                       
                <!-- Spark Streaming dependencies -->                            
    		<dependency>                                                        
    			<groupId>org.apache.spark</groupId>                           
    			<artifactId>spark-streaming_2.10</artifactId>                 
    			<version>1.5.2</version>                                      
    		</dependency>                                                       
    	</dependencies>                                                           
                                                                               
     	<build>                                                                   
    		<plugins>                                                           
    			<plugin>                                                      
    				<artifactId>maven-compiler-plugin</artifactId>          
    				<version>2.5.1</version>                                
    				<configuration>                                         
    					<source>1.7</source>                              
    					<target>1.7</target>                              
    					<useIncrementalCompilation>false</                
    useIncrementalCompilation>                                                   
    				</configuration>                                        
    			</plugin>                                                     
    		</plugins>                                                          
    	</build>                                                                  
    </project>                                                                   
                                                                                 




Since I would most likely eventually be wanting to integrate Ignite into an
existing Spark Streaming application, I decided to keep the spark-streaming
dependency and perform some tests. My next step was to combine the two
projects into one project and see if I could get my JUnit tests to run.
They did, so now it was time to begin the actual integration.

Since I want to add Ignite to the Spark Streaming project, I believe I need
to pass the Spark Streaming application the file to store in the Cache
through the Spark Worker. To do this, I added the JavaIgniteContext to the
same file I placed my JavaSparkContext. Here is the file before the change:
                                                                                                        
    package wordCount_test;                                                                             
                                                                                                        
    import static org.junit.Assert.*;                                                                   
                                                                                                        
    import java.io.File;                                                                                
                                                                                                        
    import org.apache.spark.api.java.JavaPairRDD;                                                       
    import org.apache.spark.api.java.JavaRDD;                                                           
    import org.apache.spark.api.java.JavaSparkContext;                                                  
    import org.junit.After;                                                                             
    import org.junit.Before;                                                                            
    import org.junit.Test;                                                                              
                                                                                                        
    import wordCount.SparkWordCount;                                                                    
                                                                                                        
    public class TestSparkWordCount {                                                                   
                                                                                                        
    	JavaSparkContext jsc;                                                                            
    	File txtFile;                                                                                    
                                                                                                        
    	@Before                                                                                          
    	public void setUp() throws Exception {                                                           
    		jsc = new JavaSparkContext("local[2]", "testSparkWordCount");                              
    		txtFile = new File("AIW_WordCount");                                                       
    		if(txtFile.exists()){                                                                      
    			txtFile.delete();                                                                    
    		}                                                                                          
    	}                                                                                                
                                                                                                        
    	@After                                                                                           
    	public void tearDown() throws Exception {                                                        
    		jsc.stop();                                                                                
    		jsc = null;                                                                                
    	}                                                                                                
                                                                                                        
    	//@SuppressWarnings("static-access")                                                             
    	@Test                                                                                            
    	public void testInit() {                                                                         
    		assertNotNull(jsc.sc());                                                                   
    	}                                                                                                
                                                                                                        
    	@Test                                                                                            
    	public void test() {                                                                             
    		SparkWordCount streamWords = new SparkWordCount();                                         
                                                                                                        
    		try {                                                                                      
    			JavaRDD<String> textFile = jsc                                                       
    					.textFile(                                                               
    "C:/Users/1116962/intersect_workspace/wordCount_SI/src/main/java/wordCount/alice-in-wonderland.txt  
    ");                                                                                                 
    			JavaPairRDD<String, Integer> wordCounts = streamWords                                
    					.countWords(textFile);                                                   
                                                                                                        
    			wordCounts.saveAsTextFile("AIW_WordCount");                                          
                                                                                                        
    			Thread.sleep(6000);                                                                  
                                                                                                        
    		} catch (InterruptedException e) {                                                         
    			e.printStackTrace();                                                                 
    		}                                                                                          
    		assertTrue(true);                                                                          
    	}                                                                                                
    }                                                                                                   
                                                                                                        




The problem appears when I try to add in the JavaIgniteContext whose
dependencies are all found in the ignite-spark dependency. Since it appears
that I can’t have them both, and I know my existing application will need
the Spark-Streaming dependency, I’m not sure how to proceed. Do you have
any suggestions?

Re: Ignite and Spark Streaming Integration Using Java

Posted by Denis Magda <dm...@gridgain.com>.
Hi Jennifer,

You received the following message
IP finder returned empty addresses list. Please check IP finder 
configuration and make sure multicast works on your network. Will retry 
every 2 secs.
because the node that was being started by you was a client node. 
Client's node join process succeeds only if there is at least one server 
node in a cluster. This is a default behavior.
In your case there was just a single client node that couldn't connect 
to a server, because there was no one, and that's why you kept getting 
this message.
Please refer to [1] for more details on client vs server nodes.

Everything starts working after you've added Ignition.start() exactly 
because you started a server node and the client node successfully 
connected to it.
However, I would highly recommend you to start the server node with the 
same configuration that is used by client - 
Ignition.start("src/main/resources/config/example-ignite.xml").
Otherwise you may have errors on start-up or face with other issues in 
runtime (like when a client node can't connect to the servers because 
they're using different IP finders).

In addition you may want to refer to [2] to get more details on IP finders.

[1] https://apacheignite.readme.io/docs/clients-vs-servers
[2] https://apacheignite.readme.io/docs/cluster-config

Regards,
Denis

On 12/21/2015 10:26 PM, jcoston wrote:
> I discovered the source of the issue. For some reason it was necessary to add
> another Ignition.start() to my countWords method. If anyone can explain why
> I would greatly appreciate it.
>
> Here is the final method:
>
>
> 	public JavaPairRDD<String, Integer> countWords(JavaRDD<String> textFile)
> throws Exception {
> 		
> 		System.out.println("Made it intoSparkWordCount. textFile = " +
> textFile.toString());
> 		
>          //Split the lines into words
> 		JavaRDD<String> words = textFile
> 				.flatMap(new FlatMapFunction<String, String>() {
> 					public Iterable<String> call(String s) {
> 						return Arrays.asList(s.split(" "));
> 					}
> 				});
> 		
> 		System.out.println("Split the lines into words");
> 		//Count the words
> 		JavaPairRDD<String, Integer> pairs = words
> 				.mapToPair(new PairFunction<String, String, Integer>() {
> 					public Tuple2<String, Integer> call(String s) {
> 						return new Tuple2<String, Integer>(s, 1);
> 					}
> 				});
> 		System.out.println("Counted the words Part 1");
> 		JavaPairRDD<String, Integer> counts = pairs
> 				.reduceByKey(new Function2<Integer, Integer, Integer>() {
> 					public Integer call(Integer a, Integer b) {
> 						return a + b;
> 					}
> 				});
> 		System.out.println("Counted the words Part 2");
> 		
> 		// Mark this cluster member as client.
> 		*Ignition.start();
> *        Ignition.setClientMode(true);
>
>          try (Ignite ignite =
> Ignition.start("src/main/resources/config/example-ignite.xml")) {
>              if (!ExamplesUtils.hasServerNodes(ignite))
>                  return counts;
>
>              // The cache is configured with sliding window holding 5 seconds
> of the streaming data.
>              IgniteCache<AffinityUuid, String> stmCache =
> ignite.getOrCreateCache(CacheConfig.wordCache());
>              System.out.println("Configured Cache " + stmCache.getName());
>
>              try (IgniteDataStreamer<AffinityUuid, String> stmr =
> ignite.dataStreamer(stmCache.getName())) {
>                  // Stream words from "alice-in-wonderland" book.
>                  while (true) {
>                      InputStream in =
> SparkIgniteWordCount.class.getResourceAsStream("alice-in-wonderland.txt");
>                       
>                      try (LineNumberReader rdr = new LineNumberReader(new
> InputStreamReader(in))) {
>                          for (String line = rdr.readLine(); line != null;
> line = rdr.readLine()) {
>                              for (String word : line.split(" "))
>                                  if (!word.isEmpty())
>                                      // Stream words into Ignite.
>                                      // By using AffinityUuid we ensure that
> identical
>                                      // words are processed on the same
> cluster node.
>                                      stmr.addData(new AffinityUuid(word),
> word);
>                          }
>                      }
>                  }
>              }
>          }
>          catch(Exception e){
>          	e.printStackTrace();
>          	throw e;
>          }finally{
>          	return counts;
>          }
> 	}
> }
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2268.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.


Re: Ignite and Spark Streaming Integration Using Java

Posted by jcoston <je...@raytheon.com>.
I discovered the source of the issue. For some reason it was necessary to add
another Ignition.start() to my countWords method. If anyone can explain why
I would greatly appreciate it. 

Here is the final method:


	public JavaPairRDD<String, Integer> countWords(JavaRDD<String> textFile)
throws Exception {
		
		System.out.println("Made it intoSparkWordCount. textFile = " +
textFile.toString());
		
        //Split the lines into words
		JavaRDD<String> words = textFile
				.flatMap(new FlatMapFunction<String, String>() {
					public Iterable<String> call(String s) {
						return Arrays.asList(s.split(" "));
					}
				});
		
		System.out.println("Split the lines into words");
		//Count the words
		JavaPairRDD<String, Integer> pairs = words
				.mapToPair(new PairFunction<String, String, Integer>() {
					public Tuple2<String, Integer> call(String s) {
						return new Tuple2<String, Integer>(s, 1);
					}
				});
		System.out.println("Counted the words Part 1");
		JavaPairRDD<String, Integer> counts = pairs
				.reduceByKey(new Function2<Integer, Integer, Integer>() {
					public Integer call(Integer a, Integer b) {
						return a + b;
					}
				});
		System.out.println("Counted the words Part 2");
		
		// Mark this cluster member as client.
		*Ignition.start();
*        Ignition.setClientMode(true);

        try (Ignite ignite =
Ignition.start("src/main/resources/config/example-ignite.xml")) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return counts;

            // The cache is configured with sliding window holding 5 seconds
of the streaming data.
            IgniteCache<AffinityUuid, String> stmCache =
ignite.getOrCreateCache(CacheConfig.wordCache());
            System.out.println("Configured Cache " + stmCache.getName());

            try (IgniteDataStreamer<AffinityUuid, String> stmr =
ignite.dataStreamer(stmCache.getName())) {
                // Stream words from "alice-in-wonderland" book.
                while (true) {
                    InputStream in =
SparkIgniteWordCount.class.getResourceAsStream("alice-in-wonderland.txt");
                     
                    try (LineNumberReader rdr = new LineNumberReader(new
InputStreamReader(in))) {
                        for (String line = rdr.readLine(); line != null;
line = rdr.readLine()) {
                            for (String word : line.split(" "))
                                if (!word.isEmpty())
                                    // Stream words into Ignite.
                                    // By using AffinityUuid we ensure that
identical
                                    // words are processed on the same
cluster node.
                                    stmr.addData(new AffinityUuid(word),
word);
                        }
                    }
                }
            } 
        }
        catch(Exception e){
        	e.printStackTrace();
        	throw e;
        }finally{
        	return counts;
        }
	}
}




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2268.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Ignite and Spark Streaming Integration Using Java

Posted by Jennifer Coston <Je...@raytheon.com>.
I looked into this further and I'm not sure why the IP finder is returning
an empty list. The configuration file has specified that it should use
Automatic discovery. I know that this should be working, since the code
runs on its own. Is there an additional configuration that needs to be done
since I am using a JUnit test case?

<property name="discoverySpi">
            <bean class=
"org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic
discovery that can be used
                        instead os static IP based discovery. For
information on all options refer
                        to our documentation:
http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based
discovery of initial nodes. -->
                    <!--<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class=
"org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"
>
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace
with actual host IP address. -->
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>

Re: Ignite and Spark Streaming Integration Using Java

Posted by Jennifer Coston <Je...@raytheon.com>.
Thank you Andrey, it is running now.

How long does it take to pass all of the tests? The program has been
running for 10 minutes and keeps giving me the message: IP finder returned
empty addresses list. Please check IP finder configuration and make sure
multicast works on your network. Will retry every 2 secs.

Does this have to do with the configuration you mentioned earlier?

Thanks,
-Jennifer

Re: Ignite and Spark Streaming Integration Using Java

Posted by Andrey Gura <ag...@gridgain.com>.
Jennifer,

There is no need in ExampleNodes for this test.

On Mon, Dec 21, 2015 at 6:03 PM, Jennifer Coston <
Jennifer.Coston@raytheon.com> wrote:

> Andrey,
>
> Thank you for the feedback. I have updated my code with the changes that
> you provided, but I am seeing the following errors when I try to run it.
> When I run the test, I first start two ExampleNodes and then run the JUnit
> test. Is that still the proper procedure?
>
> class org.apache.ignite.IgniteException: Failed to start manager:
> GridManagerAdapter [enabled=true,
> name=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager]
> at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:881)
> at org.apache.ignite.Ignition.start(Ignition.java:306)
> at
> wordCount_test.TestSparkIgniteWordCount.setUp(TestSparkIgniteWordCount.java:40)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.internal.runners.BeforeAndAfterRunner.invokeMethod(BeforeAndAfterRunner.java:74)
> at
> org.junit.internal.runners.BeforeAndAfterRunner.runBefores(BeforeAndAfterRunner.java:50)
> at
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:33)
> at
> org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
> at
> org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
> at
> org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
> at
> org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
> at
> org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
> at
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
> at org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: class org.apache.ignite.IgniteCheckedException: Failed to start
> manager: GridManagerAdapter [enabled=true,
> name=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager]
> at
> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1488)
> at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:908)
> at
> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:1617)
> at
> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1484)
> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:965)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:494)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:470)
> at org.apache.ignite.Ignition.start(Ignition.java:303)
> ... 21 more
> Caused by: class org.apache.ignite.IgniteCheckedException: Remote node has
> peer class loading enabled flag different from local [locId8=3b7130ea,
> locPeerClassLoading=false, rmtId8=d92e93a6, rmtPeerClassLoading=true,
> rmtAddrs=[2601:151:c200:6d4:0:0:0:ef63/0:0:0:0:0:0:0:1,
> DULL-AI356580.us.ray.com/127.0.0.1,
> DULL-AI356580.us.ray.com/147.25.241.114, /2002:9319:f172:0:0:0:9319:f172,
> /2601:151:c200:6d4:0:0:0:ef63]]
> at
> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.checkAttributes(GridDiscoveryManager.java:1028)
> at
> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:686)
> at
> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1485)
> ... 28 more
>
>
> Here is my updated JUnit Test:
>
> package wordCount_test;
>
> import static org.junit.Assert.*;
>
> import java.io.File;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.LineNumberReader;
>
> import org.apache.ignite.lang.IgniteOutClosure;
> import org.apache.ignite.spark.JavaIgniteContext;
> import org.apache.ignite.cache.affinity.AffinityUuid;
> import org.apache.ignite.configuration.*;
> import org.apache.ignite.internal.util.typedef.F;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteDataStreamer;
> import org.apache.ignite.Ignition;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
>
> import testWordCount.SparkIgniteWordCount;
> import testWordCount.CacheConfig;
> import testWordCount.ExamplesUtils;
>
> public class TestSparkIgniteWordCount {
>
> JavaSparkContext jsc;
> JavaIgniteContext<String, Integer> jic;
> File txtFile;
>
>
> //In order to run this test, you must first start two ExampleNodeStartup
> Servers
> @Before
> public void setUp() throws Exception {
> Ignite ignite = Ignition.start();
>     jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
>     jic = new JavaIgniteContext<>(jsc, new
> IgniteOutClosure<IgniteConfiguration>(){
>     @Override public IgniteConfiguration apply() {
>             return new IgniteConfiguration();
>         }
>     });
>
>     txtFile = new File("AIW_WordCount");
>     if(txtFile.exists()){
>         txtFile.delete();
>     }
>
> }
>
> @After
> public void tearDown() throws Exception {
>     if(jsc != null){
>         jsc.stop();
>         jsc = null;
>     }
>     if(jic != null){
>     jic = null;
>     }
>     Ignition.stopAll(true);
> }
>
> @Test
> public void testSparkInit() {
>     assertNotNull(jsc.sc());
> }
>
> @Test
> public void testIgniteInit(){
> assertNotNull(jic);
> }
>
> @Test
> public void testSparkWordCount() {
> SparkIgniteWordCount streamWords = new SparkIgniteWordCount();
>
> try {
> JavaRDD<String> textFile = jsc
> .textFile("alice-in-wonderland.txt");
> JavaPairRDD<String, Integer> wordCounts = streamWords
> .countWords(textFile);
>
> wordCounts.saveAsTextFile("AIW_WordCount");
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> assertTrue(true);
> }
>
> }
>
>
>    Thank you,
>    Jennifer
>
>


-- 
Andrey Gura
GridGain Systems, Inc.
www.gridgain.com

Re: Ignite and Spark Streaming Integration Using Java

Posted by Jennifer Coston <Je...@raytheon.com>.
Andrey,

Thank you for the feedback. I have updated my code with the changes that
you provided, but I am seeing the following errors when I try to run it.
When I run the test, I first start two ExampleNodes and then run the JUnit
test. Is that still the proper procedure?

class org.apache.ignite.IgniteException: Failed to start manager:
GridManagerAdapter [enabled=true,
name=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager]
	at org.apache.ignite.internal.util.IgniteUtils.convertException
(IgniteUtils.java:881)
	at org.apache.ignite.Ignition.start(Ignition.java:306)
	at wordCount_test.TestSparkIgniteWordCount.setUp
(TestSparkIgniteWordCount.java:40)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.junit.internal.runners.BeforeAndAfterRunner.invokeMethod
(BeforeAndAfterRunner.java:74)
	at org.junit.internal.runners.BeforeAndAfterRunner.runBefores
(BeforeAndAfterRunner.java:50)
	at org.junit.internal.runners.BeforeAndAfterRunner.runProtected
(BeforeAndAfterRunner.java:33)
	at org.junit.internal.runners.TestMethodRunner.runMethod
(TestMethodRunner.java:75)
	at org.junit.internal.runners.TestMethodRunner.run
(TestMethodRunner.java:45)
	at org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod
(TestClassMethodsRunner.java:71)
	at org.junit.internal.runners.TestClassMethodsRunner.run
(TestClassMethodsRunner.java:35)
	at org.junit.internal.runners.TestClassRunner$1.runUnprotected
(TestClassRunner.java:42)
	at org.junit.internal.runners.BeforeAndAfterRunner.runProtected
(BeforeAndAfterRunner.java:34)
	at org.junit.internal.runners.TestClassRunner.run
(TestClassRunner.java:52)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run
(JUnit4TestReference.java:50)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run
(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests
(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests
(RemoteTestRunner.java:675)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run
(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main
(RemoteTestRunner.java:192)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to start
manager: GridManagerAdapter [enabled=true,
name=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager]
	at org.apache.ignite.internal.IgniteKernal.startManager
(IgniteKernal.java:1488)
	at org.apache.ignite.internal.IgniteKernal.start
(IgniteKernal.java:908)
	at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0
(IgnitionEx.java:1617)
	at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start
(IgnitionEx.java:1484)
	at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:965)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:494)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:470)
	at org.apache.ignite.Ignition.start(Ignition.java:303)
	... 21 more
Caused by: class org.apache.ignite.IgniteCheckedException: Remote node has
peer class loading enabled flag different from local [locId8=3b7130ea,
locPeerClassLoading=false, rmtId8=d92e93a6, rmtPeerClassLoading=true,
rmtAddrs=[2601:151:c200:6d4:0:0:0:ef63/0:0:0:0:0:0:0:1,
DULL-AI356580.us.ray.com/127.0.0.1,
DULL-AI356580.us.ray.com/147.25.241.114, /2002:9319:f172:0:0:0:9319:f172, /2601:151:c200:6d4:0:0:0:ef63]]
	at
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.checkAttributes
(GridDiscoveryManager.java:1028)
	at
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start
(GridDiscoveryManager.java:686)
	at org.apache.ignite.internal.IgniteKernal.startManager
(IgniteKernal.java:1485)
	... 28 more


Here is my updated JUnit Test:

package wordCount_test;

import static org.junit.Assert.*;

import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;

import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.spark.JavaIgniteContext;
import org.apache.ignite.cache.affinity.AffinityUuid;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import testWordCount.SparkIgniteWordCount;
import testWordCount.CacheConfig;
import testWordCount.ExamplesUtils;

public class TestSparkIgniteWordCount {

JavaSparkContext jsc;
JavaIgniteContext<String, Integer> jic;
File txtFile;


//In order to run this test, you must first start two ExampleNodeStartup
Servers
@Before
public void setUp() throws Exception {
	Ignite ignite = Ignition.start();
    jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
    jic = new JavaIgniteContext<>(jsc, new
IgniteOutClosure<IgniteConfiguration>(){
    	@Override public IgniteConfiguration apply() {
            return new IgniteConfiguration();
        }
    });

    txtFile = new File("AIW_WordCount");
    if(txtFile.exists()){
        txtFile.delete();
    }

}

@After
public void tearDown() throws Exception {
    if(jsc != null){
        jsc.stop();
        jsc = null;
    }
    if(jic != null){
    	jic = null;
    }
    Ignition.stopAll(true);
}

@Test
public void testSparkInit() {
    assertNotNull(jsc.sc());
}

@Test
public void testIgniteInit(){
	assertNotNull(jic);
}

@Test
public void testSparkWordCount() {
	SparkIgniteWordCount streamWords = new SparkIgniteWordCount();

	try {
		JavaRDD<String> textFile = jsc
				.textFile("alice-in-wonderland.txt");
		JavaPairRDD<String, Integer> wordCounts = streamWords
				.countWords(textFile);

		wordCounts.saveAsTextFile("AIW_WordCount");
	} catch (Exception e) {
		e.printStackTrace();
	}

	assertTrue(true);
}

}

Thank you,
Jennifer

Re: Ignite and Spark Streaming Integration Using Java

Posted by Andrey Gura <ag...@gridgain.com>.
Jennifer,

I have another problems in this code:

1. igniteConfig field isn't initialized that leads to NullPointerException.
2. The second parameter of JavaIgniteContextConstructor should be
configuration URL instead of igniteConfiguration.getLocalHost() result.
3. There is no any Ignite node started


I modified code of TestSparkIgniteWordCount class and now it works:

public class TestSparkIgniteWordCount {

    JavaSparkContext jsc;
    JavaIgniteContext<String, Integer> jic;
    File txtFile;

    @Before
    public void setUp() throws Exception {
        Ignite ignite = Ignition.start();
        jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
        jic = new JavaIgniteContext<>(jsc, new
IgniteOutClosure<IgniteConfiguration>() {
            @Override public IgniteConfiguration apply() {
                return new IgniteConfiguration();
            }
        });
        txtFile = new File("AIW_WordCount");
        if (txtFile.exists()) {
            txtFile.delete();
        }

    }

    @After
    public void tearDown() throws Exception {
        if (jsc != null) {
            jsc.stop();
            jsc = null;
        }
        if (jic != null) {
            jic = null;
        }
        Ignition.stopAll(true);
    }

    @Test
    public void testSparkInit() {
        assertNotNull(jsc.sc());
    }

    @Test
    public void testIgniteInit() {
        assertNotNull(jic);
    }

It can take some time to pass tests because by default Ignite uses
multicast discovery SPI. You can provide own configuration with static
discovery SPI (see example-default.xml configuration file in Ignite sources)


On Mon, Dec 21, 2015 at 3:53 PM, jcoston <je...@raytheon.com>
wrote:

> Andry,
>
> I think you are looking at the wrong project. I posted three of them. The
> first, wordCount_Spark, only uses Spark Streaming; the second,
> wordCount_Ignite, only uses Ignite; and the third testWordCounts is where I
> try to combine the two. I have made some modifications to the combined
> project and have attached the latest version. I think I have
> JavaIgniteContext working, but I'm having trouble running my JUnit test
> case. It keeps throwing the following error in SparkIgniteWordCount when it
> reaches line 66:
>
> INFO GenericApplicationContext: Refreshing
> org.springframework.context.support.GenericApplicationContext@66243c25:
> startup date [Mon Dec 21 07:46:15 EST 2015]; root of context hierarchy
> class org.apache.ignite.IgniteException: Default grid instance has already
> been started.
>         at
>
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:881)
>
> Can you please take a look at the code and the configuration files? I'm
> thinking I need to specify the name for the additional clients/servers as I
> start them. Is that correct? How do I do that in the Spring XML file?
>
> Thank you,
> Jennifer
>
>
>
> --
> View this message in context:
> http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2262.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>



-- 
Andrey Gura
GridGain Systems, Inc.
www.gridgain.com

Re: Ignite and Spark Streaming Integration Using Java

Posted by jcoston <je...@raytheon.com>.
Andry,

I think you are looking at the wrong project. I posted three of them. The
first, wordCount_Spark, only uses Spark Streaming; the second,
wordCount_Ignite, only uses Ignite; and the third testWordCounts is where I
try to combine the two. I have made some modifications to the combined
project and have attached the latest version. I think I have
JavaIgniteContext working, but I'm having trouble running my JUnit test
case. It keeps throwing the following error in SparkIgniteWordCount when it
reaches line 66:

INFO GenericApplicationContext: Refreshing
org.springframework.context.support.GenericApplicationContext@66243c25:
startup date [Mon Dec 21 07:46:15 EST 2015]; root of context hierarchy
class org.apache.ignite.IgniteException: Default grid instance has already
been started.
	at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:881)

Can you please take a look at the code and the configuration files? I'm
thinking I need to specify the name for the additional clients/servers as I
start them. Is that correct? How do I do that in the Spring XML file?

Thank you,
Jennifer



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2262.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Ignite and Spark Streaming Integration Using Java

Posted by Andrey Gura <ag...@gridgain.com>.
Jennifer,

I ran wordCount_Spark example that you attached and it works. I see that
TestSparkWordCount class doesn't use JavaIgniteContex so I can't reproduce
your problem.

Could you please provide code that has described problem?

On Fri, Dec 18, 2015 at 7:31 PM, jcoston <je...@raytheon.com>
wrote:

> Sorry about that, my system removed the attachments. Here they are:
>
> testWordCount.zip
> <
> http://apache-ignite-users.70518.x6.nabble.com/file/n2257/testWordCount.zip
> >
> wordCount_Ignite.zip
> <
> http://apache-ignite-users.70518.x6.nabble.com/file/n2257/wordCount_Ignite.zip
> >
> wordCount_Spark.zip
> <
> http://apache-ignite-users.70518.x6.nabble.com/file/n2257/wordCount_Spark.zip
> >
>
>
>
> --
> View this message in context:
> http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2257.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>



-- 
Andrey Gura
GridGain Systems, Inc.
www.gridgain.com

Re: Ignite and Spark Streaming Integration Using Java

Posted by jcoston <je...@raytheon.com>.
Sorry about that, my system removed the attachments. Here they are:

testWordCount.zip
<http://apache-ignite-users.70518.x6.nabble.com/file/n2257/testWordCount.zip>  
wordCount_Ignite.zip
<http://apache-ignite-users.70518.x6.nabble.com/file/n2257/wordCount_Ignite.zip>  
wordCount_Spark.zip
<http://apache-ignite-users.70518.x6.nabble.com/file/n2257/wordCount_Spark.zip>  



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Re-Ignite-and-Spark-Streaming-Integration-Using-Java-tp2255p2257.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Ignite and Spark Streaming Integration Using Java

Posted by Jennifer Coston <Je...@raytheon.com>.
Hi AG,

I got the code to compile with the help of Val, but now I'm having trouble
combining my two tutorials. We have an existing system that uses Spark
Streaming, but we would like to add in the functionality of Ignite so that
we can have a cache, and save states for the Spark Workers. As a proof of
concept, I am trying to integrate two tutorials for counting words.  In the
first tutorial I created a Java project that uses Ignite to count the words
in a document on a sliding window of 5 seconds and then query the cache to
determine the most common words. In the second tutorial, I created a Java
project that counts the words in a document on a rolling window of 5
seconds and saves the output to a text document.

I'm at the point now where I am trying to combine the two projects. This
may not be the right approach so please correct me if I am wrong, but I
believe that the Spark Workers will need to talk to the Ignite Nodes who
then in turn talk to the Ignite cache. If I'm understanding things
correctly then in my JUnit test, I need to create a JavaIgniteContext at
the same time I create my JavaSparkContext. I would then call my Spark word
counter and pass it the Ignite Context. The Spark worker would then stream
the data into the ignite cache and use ignite to query the words. Does that
sound correct?

I'm still a little confused on how the whole integration part is supposed
to work. So, I was trying to run one of the tests that I saw in the source
code, JavaIngiteRDDSelfTest, but it sounds like I need to have the source
code running to that. I've attached all three of my eclipse projects if you
want to look at them.

Any help/guidance you can provide would be greatly appreciated!

(See attached file: wordCount_Ignite.zip) (See attached file:
wordCount_Spark.zip)(See attached file: testWordCount.zip)

Thank you!
Jennifer




From:	Alexey Goncharuk <al...@gmail.com>
To:	user@ignite.apache.org
Date:	12/18/2015 10:46 AM
Subject:	Re: Ignite and Spark Streaming Integration Using Java



Hello Jennifer,

I created a sample Spark stream words example and combined it with the code
you have provided (both Java and POM), and it worked fine for me. Can you
share the specific compile error/exception you see with us?

Thanks,
AG

Re: Ignite and Spark Streaming Integration Using Java

Posted by Alexey Goncharuk <al...@gmail.com>.
Hello Jennifer,

I created a sample Spark stream words example and combined it with the code
you have provided (both Java and POM), and it worked fine for me. Can you
share the specific compile error/exception you see with us?

Thanks,
AG