You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by sl...@apache.org on 2008/03/14 23:29:57 UTC
svn commit: r637297 [2/6] - in /incubator/tuscany/java/sca:
demos/workpool-distributed/ demos/workpool-distributed/src/
demos/workpool-distributed/src/main/
demos/workpool-distributed/src/main/java/
demos/workpool-distributed/src/main/java/node/ demos/...
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package node;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonController;
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.node.NodeException;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.SCANodeFactory;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import java.net.URI;
+
+import workpool.WorkerManager;
+import workpool.WorkerManagerImpl;
+import workpool.WorkpoolManager;
+import workpool.WorkpoolService;
+import workpool.WorkpoolServiceImpl;
+
+/**
+ * This client program shows how to run a distributed SCA node. In this case a
+ * calculator node has been constructed specifically for running the calculator
+ * composite. Internally it creates a representation of a node and associates a
+ * distributed domain with the node. This separation is made different
+ * implementations of the distributed domain can be provided.
+ */
+public class WorkpoolDaemon implements Daemon, Runnable {
+ private String domainName;
+ private String nodeName;
+ private long iterations;
+ private long jobsNo;
+ private long workerNo;
+ private SCANode node;
+ private boolean stopped = false;
+ private DaemonController controller = null;
+ private Thread thread = null;
+ private String ruleFile = "workerRules.drl";
+
+ /*
+ * public static void main(String[] args) throws Exception {
+ * // Check that the correct arguments have been provided if (null == args ||
+ * args.length < 4) { System.err.println("Usage: java WorkpoolNode
+ * domainname nodename iterTest workerNo"); System.exit(1); }
+ *
+ * try { String domainName = args[0]; String nodeName = args[1]; long
+ * iterations = Long.parseLong(args[2]); long jobsNo =
+ * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]);
+ * ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
+ *
+ * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node =
+ * nodeFactory.createSCANode(null, domainName);
+ * node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ * node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ * node.start(); // nodeA is the head node and runs some tests while all
+ * other nodes // simply listen for incoming messages
+ *
+ * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer =
+ * new StringBuffer();
+ *
+ * BufferedReader br = new BufferedReader(rules); String ruleString; do {
+ * ruleString = br.readLine(); if (ruleString!=null) {
+ * buffer.append(ruleString);} } while (ruleString!=null);
+ *
+ * if ( nodeName.equals("nodeA") ) { // do some application stuff
+ * WorkpoolService workpoolService =
+ * node.getDomain().getService(WorkpoolService.class,
+ * "WorkpoolServiceComponent"); workpoolService.start();
+ * NodeManagerInitService nodeInit =
+ * node.getDomain().getService(NodeManagerInitService.class,
+ * "WorkpoolManagerComponent/NodeManagerInitService");
+ * nodeInit.setNode(node); WorkpoolManager workpoolManager =
+ * node.getDomain().getService(WorkpoolManager.class,
+ * "WorkpoolManagerComponent/WorkpoolManager");
+ * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class,
+ * "WorkpoolServiceComponent"));
+ * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start();
+ * int items[] = {3,4,5,6,3,6,3,5,9,5,6};
+ *
+ * double x = 398349; for (int i = 0; i < jobsNo; ++i)
+ * workpoolService.submit(new TestJob(x,iterations,items));
+ *
+ * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){
+ * j.setEOS(); workpoolService.submit(j); } } try { if
+ * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService");
+ * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) {
+ * NodeManagerInitService workerManagerC =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService");
+ * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) {
+ * NodeManagerInitService workerManagerD =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService");
+ * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) {
+ * NodeManagerInitService workerManagerE =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService");
+ * workerManagerE.setNode(node); }
+ *
+ * System.out.println("Node started (press enter to shutdown)");
+ * System.in.read(); } catch (IOException e) { e.printStackTrace(); } //
+ * stop the node and all the domains in it node.stop(); node.destroy();
+ * System.exit(0); } catch(Exception ex) { System.err.println("Exception in
+ * node - " + ex.getMessage()); ex.printStackTrace(System.err); } }
+ */
+ public void destroy() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void init(DaemonContext arg0) throws Exception {
+ String[] args = arg0.getArguments();
+ domainName = args[0];
+ nodeName = args[1];
+ iterations = Long.parseLong(args[2]);
+ jobsNo = Long.parseLong(args[3]);
+ workerNo = Long.parseLong(args[4]);
+ if (args.length == 6) {
+ ruleFile = args[5];
+ }
+ this.controller = arg0.getController();
+ // this.thread=new Thread(this);
+ }
+
+ public void start() throws Exception {
+
+ ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
+
+ SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
+ node = nodeFactory.createSCANode(null, domainName);
+ node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ node.start();
+ // nodeA is the head node and runs some tests while all other nodes
+ // simply listen for incoming messages
+
+ FileReader rules = new FileReader(ruleFile);
+ StringBuffer buffer = new StringBuffer();
+
+ BufferedReader br = new BufferedReader(rules);
+ String ruleString;
+ do {
+ ruleString = br.readLine();
+ if (ruleString != null) {
+ buffer.append(ruleString + "\n");
+ }
+ } while (ruleString != null);
+
+ if (nodeName.equals("nodeA")) {
+ // do some application stuff
+ WorkpoolService workpoolService = node.getDomain().getService(
+ WorkpoolService.class, "WorkpoolServiceComponent");
+ workpoolService.start();
+ NodeManagerInitService nodeInit = node.getDomain().getService(
+ NodeManagerInitService.class,
+ "WorkpoolManagerComponent/NodeManagerInitService");
+ nodeInit.setNode(node);
+ WorkpoolManager workpoolManager = node.getDomain().getService(
+ WorkpoolManager.class,
+ "WorkpoolManagerComponent/WorkpoolManager");
+ workpoolManager.setWorkpoolReference(node.getDomain()
+ .getServiceReference(WorkpoolService.class,
+ "WorkpoolServiceComponent"));
+ workpoolManager.acceptRules(buffer.toString());
+ workpoolManager.start();
+
+ int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
+
+ double x = 398349;
+ for (int i = 0; i < jobsNo; ++i) {
+ workpoolService.submit(new TestJob(x, iterations, items));
+ }
+ TestJob j = new TestJob(-1.0, true);
+ for (int i = 0; i < workerNo + 1; ++i) {
+ j.setEOS();
+ workpoolService.submit(j);
+ }
+
+ }
+ if (nodeName.equals("nodeB")) {
+ NodeManagerInitService workerManagerNodeB = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeBComponent/NodeManagerInitService");
+ workerManagerNodeB.setNode(node);
+ }
+
+ if (nodeName.equals("nodeC")) {
+ NodeManagerInitService workerManagerNodeC = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeCComponent/NodeManagerInitService");
+ workerManagerNodeC.setNode(node);
+ }
+
+ if (nodeName.equals("nodeD")) {
+ NodeManagerInitService workerManagerNodeD = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeDComponent/NodeManagerInitService");
+ workerManagerNodeD.setNode(node);
+ }
+
+ if (nodeName.equals("nodeE")) {
+ NodeManagerInitService workerManagerNodeE = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeEComponent/NodeManagerInitService");
+ workerManagerNodeE.setNode(node);
+ }
+
+ this.waitForever();
+ // this.thread.start();
+ }
+
+ public void stop() throws Exception {
+ Thread.currentThread().interrupt();
+ // thread.interrupt();
+ node.stop();
+ node.destroy();
+ }
+
+ private synchronized void waitForever() {
+ while (!stopped) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {
+ stopped = true;
+ return;
+ }
+ }
+ }
+
+ public void run() {
+ waitForever();
+ }
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package node;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.SCANodeFactory;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import java.net.URI;
+
+import workpool.WorkerManager;
+import workpool.WorkerManagerImpl;
+import workpool.WorkpoolManager;
+import workpool.WorkpoolService;
+import workpool.WorkpoolServiceImpl;
+
+/**
+ * This client program shows how to run a distributed SCA node. In this case a
+ * calculator node has been constructed specifically for running the calculator
+ * composite. Internally it creates a representation of a node and associates a
+ * distributed domain with the node. This separation is made different
+ * implementations of the distributed domain can be provided.
+ */
+public class WorkpoolNode {
+
+ public static void main(String[] args) throws Exception {
+
+ // Check that the correct arguments have been provided
+ if (null == args || args.length < 4) {
+ System.err
+ .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo");
+ System.exit(1);
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String domainName = args[0];
+ String nodeName = args[1];
+ long iterations = Long.parseLong(args[2]);
+ long jobsNo = Long.parseLong(args[3]);
+ long workerNo = Long.parseLong(args[4]);
+ ClassLoader cl = WorkpoolNode.class.getClassLoader();
+
+ SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
+ SCANode node = nodeFactory.createSCANode(null, domainName);
+ node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ node.start();
+ // nodeA is the head node and runs some tests while all other nodes
+ // simply listen for incoming messages
+
+ FileReader rules = new FileReader("workerRules.drl");
+ StringBuffer buffer = new StringBuffer();
+
+ BufferedReader br = new BufferedReader(rules);
+ String ruleString;
+ do {
+ ruleString = br.readLine();
+ if (ruleString != null) {
+ buffer.append(ruleString + "\n");
+ }
+ } while (ruleString != null);
+
+ if (nodeName.equals("nodeA")) {
+ // do some application stuff
+ WorkpoolService workpoolService = node.getDomain().getService(
+ WorkpoolService.class, "WorkpoolServiceComponent");
+ workpoolService.start();
+ NodeManagerInitService nodeInit = node.getDomain().getService(
+ NodeManagerInitService.class,
+ "WorkpoolManagerComponent/NodeManagerInitService");
+ nodeInit.setNode(node);
+ WorkpoolManager workpoolManager = node.getDomain().getService(
+ WorkpoolManager.class,
+ "WorkpoolManagerComponent/WorkpoolManager");
+ workpoolManager.setWorkpoolReference(node.getDomain()
+ .getServiceReference(WorkpoolService.class,
+ "WorkpoolServiceComponent"));
+ workpoolManager.setCycleTime(8000);
+ workpoolManager.acceptRules(buffer.toString());
+ workpoolManager.start();
+ int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
+
+ double x = 398349;
+
+ for (int i = 0; i < jobsNo; ++i)
+ workpoolService.submit(new TestJob(x, iterations, items));
+
+ TestJob j = new TestJob(-1.0, true);
+ for (int i = 0; i < workerNo + 1; ++i) {
+ j.setEOS();
+ workpoolService.submit(j);
+ }
+
+ }
+ try {
+ if (nodeName.equals("nodeB")) {
+ NodeManagerInitService serviceNodeB = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeBComponent/NodeManagerInitService");
+ serviceNodeB.setNode(node);
+ }
+ if (nodeName.equals("nodeC")) {
+ NodeManagerInitService workerManagerC = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeCComponent/NodeManagerInitService");
+ workerManagerC.setNode(node);
+ }
+ if (nodeName.equals("nodeD")) {
+ NodeManagerInitService workerManagerD = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeDComponent/NodeManagerInitService");
+ workerManagerD.setNode(node);
+ }
+ if (nodeName.equals("nodeE")) {
+ NodeManagerInitService workerManagerE = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeEComponent/NodeManagerInitService");
+ workerManagerE.setNode(node);
+ }
+
+ System.out.println("Node started (press enter to shutdown)");
+ String buff;
+ for (;;) {
+ try {
+ buff = in.readLine();
+ if (buff == null)
+ break;
+ System.out.print(in.readLine());
+ } catch (IOException ex) {
+ break; // Exit thread.
+ }
+ }
+ // stop the node and all the domains in it
+ node.stop();
+ node.destroy();
+ System.exit(0);
+ } catch (Exception ex) {
+ System.err.println("Exception in node - " + ex.getMessage());
+ ex.printStackTrace(System.err);
+ }
+ }
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/workerRules1.drl
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/workerRules1.drl?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/workerRules1.drl (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/node/workerRules1.drl Fri Mar 14 15:29:46 2008
@@ -0,0 +1,13 @@
+package workpool
+import workpool.*;
+rule "WorkerAdder1"
+ when
+ $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500))
+ then
+ $workerBean.setSingleAction()
+ $workerBean.addWorkerToNode("nodeB")
+ $workerBean.addWorkerToNode("nodeC")
+ $workerBean.addWorkerToNode("nodeD")
+ $workerBean.addWorkerToNode("nodeE")
+end
+
\ No newline at end of file
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.io.StringReader;
+import java.net.URI;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.logging.Logger;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.tuscany.sca.assembly.MetaComponent;
+import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent;
+
+public class MetaComponentWorker extends DefaultMetaComponent {
+
+ private SecureRandom prng;
+ private String componentName;
+ private String scdl;
+ private String javaClass;
+ private boolean loadedFromString = false;
+ private Logger log = Logger.getLogger(MetaComponentWorker.class.getName());
+
+ public MetaComponentWorker() {
+ componentName = "WorkerComponent"
+ + java.util.UUID.randomUUID().toString();
+ }
+
+ public void setWorkerName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public void setWorkerClass(String javaClass) {
+ this.javaClass = javaClass;
+ }
+
+ private String generateSCDL() {
+ StringBuffer buffer = new StringBuffer(512);
+ buffer
+ .append("<component xmlns=\"http://www.osoa.org/xmlns/sca/1.0\" name=\"");
+ buffer.append(this.componentName);
+ buffer.append("\">\n");
+ buffer.append("<implementation.java class=\"");
+ buffer.append(this.javaClass);
+ buffer.append("\"/>");
+ buffer.append("<property name=\"workerName\">");
+ buffer.append(this.componentName);
+ buffer.append("</property>\n</component>");
+ return buffer.toString();
+ }
+
+ @Override
+ public XMLStreamReader build() throws Exception {
+ XMLInputFactory factory = XMLInputFactory.newInstance();
+ if (!loadedFromString)
+ scdl = generateSCDL();
+ return factory.createXMLStreamReader(new StringReader(scdl));
+
+ }
+
+ public String getName() {
+
+ return componentName;
+ }
+
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
+import org.apache.tuscany.sca.databinding.job.RemoteJob;
+import org.osoa.sca.annotations.Scope;
+
+@Scope("COMPOSITE")
+public class MyWorker extends WorkerServiceImpl<Object, Double> {
+ private static int resultcount = 0;
+
+ @Override
+ public ResultJob computeTask(Job<Object, Double> job) {
+
+ RemoteJob remoteJob = (RemoteJob) job;
+ System.out.println("Computing the job");
+ JobExecutionContext context = remoteJob.getContext();
+ ResultJob resultJob = new ResultJob();
+ JobDataMap resultMap = new JobDataMap();
+ resultMap.addJobData("result", remoteJob.compute(context));
+ resultJob.setJobDataMap(resultMap);
+ System.out.println("Count result = " + (++resultcount));
+ return resultJob;
+ }
+
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+
+public class NullJob implements Job, java.io.Serializable {
+
+ public Object compute(Object arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public JobDataMap getDataMap() {
+ return null;
+ }
+
+ public boolean eos() {
+ return false;
+ }
+
+ public int getType() {
+ return Job.NULL_JOB;
+ }
+
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/NullJob.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
+import org.apache.tuscany.sca.databinding.job.RemoteJob;
+
+public class ResultJob extends RemoteJob<Object> implements
+ java.io.Serializable {
+ private JobDataMap map;
+
+ public JobDataMap getDataMap() {
+ return map;
+ }
+
+ public void setJobDataMap(JobDataMap map) {
+ this.map = map;
+ }
+
+ public boolean eos() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ public int getType() {
+ // TODO Auto-generated method stub
+ return Job.RESULT_JOB;
+ }
+
+ @Override
+ public Object compute(JobExecutionContext v) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+public interface Trigger<T> {
+ void handleEvent(T c);
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/Trigger.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+public interface WorkerManager {
+ CallableReferenceImpl<WorkerService> addWorker();
+
+ boolean removeWorker(String workerName);
+
+ boolean removeWorkers(int k);
+
+ boolean removeAllWorkers();
+
+ double getNodeLoad();
+
+ int activeWorkers();
+
+ void start();
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.osoa.sca.CallableReference;
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+@Scope("COMPOSITE")
+@Service(interfaces = { NodeManagerInitService.class, WorkerManager.class })
+public class WorkerManagerImpl implements WorkerManager, NodeManagerInitService {
+ private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName());
+ private LinkedList<CallableReferenceImpl<WorkerService>> activeWorkers = new LinkedList<CallableReferenceImpl<WorkerService>>();
+ private List<String> workerComponentNames = new ArrayList<String>();
+ private SCANodeImpl node;
+ @Property
+ protected String nodeName;
+ @Property
+ protected String compositeName;
+ @Property
+ protected String workerClass;
+ @Context
+ protected ComponentContext context;
+ private double loadAverage;
+
+ /* This method is used to find a composite inside all deployed artifacts */
+ private Composite findComposite(List<DeployedArtifact> artifacts) {
+ for (DeployedArtifact fact : artifacts) {
+ if (fact.getModel() instanceof Composite) {
+ log.info("Searching in a contribution deployed artifacts -"
+ + compositeName);
+ Composite augmented = (Composite) fact.getModel();
+ // found
+ if (augmented.getURI().equals(compositeName)) {
+ log.info("Found composite..." + compositeName);
+ return augmented;
+ }
+ }
+ }
+ return null;
+ }
+
+ public CallableReferenceImpl<WorkerService> addWorker() {
+ log.info("Adding a new worker call..");
+ long addWorkerStartTime = System.nanoTime();
+ ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node
+ .getNodeRuntime().getContributionService();
+ Contribution contribution = cServiceImpl.getContribution(nodeName);
+ List<DeployedArtifact> artifacts = contribution.getArtifacts();
+ CallableReference<WorkerService> workerReference = null;
+ CallableReferenceImpl<WorkerService> ref = null;
+ log.info("Instantiating a metacomponent..");
+ MetaComponentWorker mcw = new MetaComponentWorker();
+ boolean found = false;
+ mcw.setWorkerClass(workerClass);
+ // ho trovato la composizione
+ Composite augmented = findComposite(artifacts);
+ try {
+ if (augmented != null) {
+ long startCreation = System.nanoTime();
+ node.addComponentToComposite(mcw, contribution.getURI(),
+ augmented.getURI());
+ System.out.println("addComponentToComposite time = "
+ + (System.nanoTime() - startCreation));
+ RuntimeComponent workerComponent = (RuntimeComponent) node
+ .getComponent(mcw.getName());
+ if (workerComponent != null) {
+ ref = (CallableReferenceImpl<WorkerService>) workerComponent
+ .getComponentContext().createSelfReference(
+ WorkerService.class);
+ ref.getService().start();
+ activeWorkers.addLast(ref);
+ workerComponentNames.add(mcw.getName());
+ log.info(context.getURI());
+ // String name = context.getURI()+"/WorkerManager";
+ CallableReferenceImpl<WorkerManager> manager = (CallableReferenceImpl) context
+ .createSelfReference(WorkerManager.class,
+ "WorkerManager");
+ ref.getService().registerManager(manager);
+ return ref;
+ }
+ } else {
+ log.info("Workpool composite not found!");
+ }
+ } catch (Exception e) {
+ log.info("Exception activation");
+ e.printStackTrace();
+ }
+ ;
+ System.out.println("Component Creation Time ="
+ + (System.nanoTime() - addWorkerStartTime));
+ return ref;
+ }
+
+ public boolean removeAllWorkers() {
+ for (CallableReferenceImpl<WorkerService> callable : activeWorkers) {
+ callable.getService().stop();
+ }
+ return true;
+ }
+
+ public boolean removeWorker() {
+ CallableReferenceImpl<WorkerService> callable = activeWorkers
+ .removeLast();
+ callable.getService().stop();
+ return true;
+ }
+
+ public boolean removeWorkers(int k) {
+ if (k >= activeWorkers.size())
+ return false;
+ for (int i = 0; i < k; ++i) {
+ if (!removeWorker())
+ return false;
+ }
+ return true;
+ }
+
+ public void setNode(SCANode node) {
+ this.node = (SCANodeImpl) node;
+
+ }
+
+ public double getNodeLoad() {
+ /*
+ * FIXME [jo] this works only on Linux To be replaced with an JNI
+ * extension
+ */
+ RandomAccessFile statfile;
+
+ this.loadAverage = 1.0;
+ // load = 0;
+ int NoProcessors = 0;
+ String cpuLine = null;
+ try {
+ NoProcessors = Runtime.getRuntime().availableProcessors();
+ if (NoProcessors > 1)
+ this.loadAverage = 1 / (1.0 * NoProcessors);
+ statfile = new RandomAccessFile("/proc/loadavg", "r");
+ try {
+ statfile.seek(0);
+ cpuLine = statfile.readLine();
+
+ } catch (IOException e) {
+ // FIX ME: Better exception handling.
+ e.printStackTrace();
+ }
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ }
+ double min1;
+ if (cpuLine != null) {
+ java.util.StringTokenizer st = new java.util.StringTokenizer(
+ cpuLine, " ");
+ min1 = Double.parseDouble(st.nextToken());
+ } else
+ min1 = 0;
+
+ return min1 * this.loadAverage;
+ }
+
+ public int activeWorkers() {
+ return activeWorkers.size();
+ }
+
+ public boolean removeWorker(String workerName) {
+ RuntimeComponent workerComponent = (RuntimeComponent) node
+ .getComponent(workerName);
+ if (workerComponent != null) {
+ log.info("Removing component " + workerName);
+ node.removeComponentFromComposite(nodeName, "Workpool.composite",
+ workerName);
+ return true;
+ }
+ return false;
+ }
+
+ public void start() {
+ // do nothing for now.
+ }
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.Callback;
+import org.osoa.sca.annotations.Remotable;
+import org.osoa.sca.annotations.OneWay;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+
+/**
+ * The interface for the multiply service
+ */
+@Remotable
+@Callback(WorkerServiceCallback.class)
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+public interface WorkerService<T, E> {
+ @OneWay
+ void compute(Job<T, E> j);
+
+ void start();
+
+ void stop();
+
+ // void addJobCompleteHandler(String triggerName,
+ // CallableReferenceImpl<Trigger> handle);
+ // void removeJobCompleteHandler(String triggerName);
+ /* The worker manager */
+ void registerManager(CallableReferenceImpl<WorkerManager> wm);
+
+ void registerSender(CallableReferenceImpl<WorkpoolService> sender);
+
+ // void init(Job nullJob);
+ @OneWay
+ void computeFirstTime(Job nullJob,
+ CallableReferenceImpl<WorkpoolService> myReference);
+
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+public interface WorkerServiceCallback {
+ void receiveResult(Job resultType, boolean reuse, String workerName);
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.RequestContext;
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.Callback;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.*;
+
+/**
+ * An implementation of the worker service.
+ */
+@Service(WorkerService.class)
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+@Scope("COMPOSITE")
+public abstract class WorkerServiceImpl<T, E> implements WorkerService<T, E> {
+ private Logger log = Logger.getLogger(this.getClass().getName());
+ private WorkerServiceCallback workerServiceCallback;
+ @Context
+ protected ComponentContext workerContext;
+ @Context
+ protected RequestContext requestContext;
+ @Property
+ protected String workerName;
+ private CallableReferenceImpl<WorkerManager> managerReference = null;
+
+ /* TODO add the triggers, but before ask */
+ // protected Map<String,Trigger> triggers = new HashMap<String,Trigger>();
+ public abstract ResultJob computeTask(Job<T, E> job);
+
+ private boolean stopped = false;
+ private CallableReferenceImpl<WorkerService> serviceRef;
+ private CallableReferenceImpl<WorkpoolService> senderService;
+ private WorkpoolService wp = null;
+ private WorkerManager manager = null;
+
+ public void start() {
+ log.info("Starting worker...");
+ stopped = false;
+ serviceRef = (CallableReferenceImpl) workerContext
+ .createSelfReference(WorkerService.class);
+
+ }
+
+ public void init(CallableReferenceImpl<WorkpoolService> sender, Job nullJob) {
+ compute(nullJob);
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ @Callback
+ public void setWorkerServiceCallback(
+ WorkerServiceCallback workerServiceCallback) {
+ log.info("Setting worker callback");
+ this.workerServiceCallback = workerServiceCallback;
+ }
+
+ public void computeFirstTime(Job nullJob,
+ CallableReferenceImpl<WorkpoolService> sender) {
+ senderService = sender;
+ wp = sender.getService();
+ workWithCallable(nullJob);
+ }
+
+ public void registerManager(CallableReferenceImpl<WorkerManager> wm) {
+ managerReference = wm;
+ manager = managerReference.getService();
+
+ }
+
+ public void registerSender(CallableReferenceImpl<WorkpoolService> sender) {
+ log.info("Registering sender..");
+ senderService = sender;
+ wp = sender.getService();
+ }
+
+ private void workWithInjection(Job j) {
+ log.info("Worker has received job");
+ if (stopped) {
+ workerServiceCallback
+ .receiveResult(j, true, workerContext.getURI());
+ if (managerReference != null)
+ manager.removeWorker(workerContext.getURI());
+ } else if (j.eos()) {
+ if (managerReference != null)
+ manager.removeWorker(workerContext.getURI());
+ }
+ if (j instanceof NullJob) {
+ workerServiceCallback.receiveResult(j, false, workerContext
+ .getURI());
+ } else {
+ workerServiceCallback.receiveResult(computeTask(j), false,
+ workerContext.getURI());
+ }
+ }
+
+ private void workWithCallable(Job j) {
+ log.info("Worker " + workerContext.getURI()
+ + " has received job with eos --> " + j.eos());
+ if (stopped) {
+ wp.handleResult(j, true, workerContext.getURI(), serviceRef, false);
+ return;
+ }
+ if (j.eos()) {
+ log.info("Got poison token...");
+ if (managerReference != null) {
+ log.info("Removing component " + workerContext.getURI());
+ manager.removeWorker(workerContext.getURI());
+
+ }
+ return;
+ }
+ if (j.getType() != Job.NULL_JOB) {
+ wp.handleResult(computeTask(j), false, workerContext.getURI(),
+ serviceRef, false);
+ } else {
+ log.info("Got a null job");
+ wp.handleResult(j, false, workerContext.getURI(), serviceRef, true);
+ }
+ }
+
+ public void compute(Job<T, E> j) {
+
+ if (senderService != null) {
+ log.info("Computing job using callable reference method");
+ workWithCallable(j);
+
+ } else {
+ log.info("Computing job using reference injection method");
+ workWithInjection(j);
+
+ }
+ }
+ /*
+ * public void addJobCompleteHandler(String triggerName,
+ * CallableReferenceImpl<Trigger> handle) { if
+ * (!triggers.containsKey(triggerName)) { triggers.put(triggerName,
+ * handle.getService()); } } public void removeJobCompleteHandler(String
+ * triggerName) { if (!triggers.containsKey(triggerName)) {
+ * triggers.remove(triggerName); } }
+ */
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.beans.*;
+import java.util.Vector;
+import java.util.logging.*;
+
+public class WorkpoolBean {
+ private Vector<WorkpoolBeanListener> listeners = new Vector<WorkpoolBeanListener>();
+ double loadAverage = 0;
+ int nodeNumbers = 0;
+ int workers = 0;
+ int estimedQueueSize = 0;
+ double averageServiceTime = 0;
+ double averageArrivalTime = 0;
+ double usageFactor = 0;
+ private final PropertyChangeSupport changes = new PropertyChangeSupport(
+ this);
+ long jobComputed = 0;
+ boolean singleAction = false;
+ private Logger log = Logger.getLogger(WorkpoolBean.class.getName());
+
+ public void setNodeNumbers(int n) {
+ this.nodeNumbers = n;
+ }
+
+ public void setWorkers(int w) {
+ this.workers = w;
+ }
+
+ public void setLoadAverage(double loadAverage) {
+ this.loadAverage = loadAverage;
+ }
+
+ public void setAverageServiceTime(double service) {
+ this.averageServiceTime = service;
+ }
+
+ public void setAverageArrivalTime(double service) {
+ this.averageArrivalTime = service;
+ }
+
+ public double getAverageArrivalTime() {
+ return this.averageArrivalTime;
+ }
+
+ public double getUtilizationFactor() {
+ return usageFactor;
+ }
+
+ public void setUsageFactor() {
+ usageFactor = averageServiceTime / averageArrivalTime;
+ }
+
+ public void setEstimedQueueSize(int size) {
+ estimedQueueSize = size;
+ }
+
+ public int getEstimedQueueSize() {
+ return estimedQueueSize;
+ }
+
+ public double getLoadAverage() {
+ return this.loadAverage;
+ }
+
+ public int getWorkers() {
+ return this.workers;
+ }
+
+ public int getNodeNumbers() {
+ return this.nodeNumbers;
+ }
+
+ public double getAverageServiceTime() {
+ return this.averageServiceTime;
+ }
+
+ public void addPropertyChangeListener(final PropertyChangeListener l) {
+ this.changes.addPropertyChangeListener(l);
+ }
+
+ public void removePropertyChangeListener(final PropertyChangeListener l) {
+ this.changes.removePropertyChangeListener(l);
+ }
+
+ private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) {
+ for (WorkpoolBeanListener l : listeners) {
+ l.handleEvent(new WorkpoolEvent(ev));
+ }
+ }
+
+ public void addWorkersToNode(int k, String nodeName) {
+ log.info("Adding a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void addWorkerToNode(String nodeName) {
+ log.info("Adding a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void removeWorkersToNode(int k, String nodeName) {
+ log.info("Removing a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void removeWorkerToNode(String nodeName) {
+ log.info("Removing a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public synchronized void addListener(WorkpoolBeanListener l) {
+ this.listeners.add(l);
+ }
+
+ public synchronized void removeListener(WorkpoolBeanListener l) {
+ this.listeners.remove(l);
+ }
+
+ public void setJobComputed(long jobComputed) {
+ this.jobComputed = jobComputed;
+
+ }
+
+ public void setSingleAction() {
+ singleAction = true;
+ }
+
+ public boolean getSingleAction() {
+ return singleAction;
+ }
+
+ public long getJobComputed() {
+ return this.jobComputed;
+ }
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.util.EventListener;
+
+public interface WorkpoolBeanListener extends EventListener {
+ public void handleEvent(WorkpoolEvent ev);
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.util.EventObject;
+
+public class WorkpoolEvent extends EventObject {
+
+ private static final long serialVersionUID = -1273928009411948768L;
+
+ public WorkpoolEvent(Object source) {
+ super(source);
+ }
+
+ public WorkpoolEvent(WorkpoolEvent ev) {
+ super(ev.source);
+ type = ev.type;
+ noWorker = ev.noWorker;
+ nodeName = ev.nodeName;
+ }
+
+ public WorkpoolEvent(Object source, int typeEv, int worker) {
+ super(source);
+ type = typeEv;
+ noWorker = worker;
+ nodeName = "";
+ }
+
+ public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) {
+ super(source);
+ type = typeEv;
+ noWorker = worker;
+ this.nodeName = nodeName;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public int workers() {
+ return noWorker;
+ }
+
+ private int type;
+ private int noWorker;
+ private String nodeName;
+ public static final int EVENT_MULTIPLE_ADD_WORKER = 0;
+ public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1;
+ public static final int SINGLE_REMOVE_WORKER = 2;
+ public static final int SINGLE_ADD_WORKER = 3;
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java?rev=637297&view=auto
==============================================================================
--- incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java (added)
+++ incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java Fri Mar 14 15:29:46 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.OneWay;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+public interface WorkpoolManager {
+ /*
+ * @param String rules This are the autonomic rules. The format is the Java
+ * Drools .drl file. You have to read it
+ */
+ @OneWay
+ void acceptRules(String rules);
+
+ @OneWay
+ void start();
+
+ @OneWay
+ void stopAutonomicCycle();
+
+ @OneWay
+ void startAutonomicCycle();
+
+ int activeWorkers();
+
+ void setCycleTime(long time);
+
+ void setWorkpoolReference(ServiceReference<WorkpoolService> serviceReference);
+}
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org