You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2010/07/13 04:12:21 UTC
svn commit: r963573 -
/incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java
Author: mattmann
Date: Tue Jul 13 02:12:21 2010
New Revision: 963573
URL: http://svn.apache.org/viewvc?rev=963573&view=rev
Log:
- progress towards OODT-15 One trunk for all OODT components with top level build
Added:
incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java (with props)
Added: incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java
URL: http://svn.apache.org/viewvc/incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java?rev=963573&view=auto
==============================================================================
--- incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java (added)
+++ incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java Tue Jul 13 02:12:21 2010
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jpl.eda.query;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import jpl.eda.product.ProductClient;
+import jpl.eda.product.ProductException;
+import jpl.eda.profile.Profile;
+import jpl.eda.profile.ProfileClient;
+import jpl.eda.profile.ProfileException;
+import jpl.eda.xmlquery.XMLQuery;
+import jpl.eda.xmlquery.Statistic;
+
+import jpl.eda.profile.*;
+
+/**
+ * A query engine runs queries.
+ *
+ * The query service uses the query engine to actually run profile and product
+ * queries. You can use the query engine directly, too, if you don't need a
+ * query server and can access profile and product servers directly.
+ *
+ */
+public class QueryEngine implements QueryService.Server {
+ public List queryProfileServers(XMLQuery query, List servers)
+ throws QueryException {
+ // check
+ if (query == null)
+ throw new IllegalArgumentException("Query required");
+ if (servers == null)
+ throw new IllegalArgumentException("Servers required");
+ if (servers.isEmpty())
+ return Collections.EMPTY_LIST;
+ Set queriedServers = new HashSet(servers.size());
+ LinkedList queriers = new LinkedList();
+
+ // sow
+ for (Iterator i = servers.iterator(); i.hasNext();) {
+ String serverID = (String) i.next();
+ launchQuerier(queriers, queriedServers, query, serverID);
+ }
+
+ List results = new ArrayList();
+ QueryException queryException = null;
+
+ // reap
+ for (;;) {
+ Querier q = null;
+ synchronized (queriedServers) {
+ if (queriers.isEmpty())
+ break;
+ q = (Querier) queriers.removeFirst();
+ }
+
+ for (;;)
+ try {
+ q.join();
+ QueryException qe = q.getException();
+ if (qe != null)
+ queryException = qe;
+ else
+ results.addAll(q.getResults());
+ break;
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ this.query = query;
+
+ // report
+ if (queryException != null)
+ throw queryException;
+
+ return results;
+ }
+
+ public List queryDefaultProfileServers(XMLQuery query) throws QueryException {
+ return queryProfileServers(query, getDefaultProfileServerIDs());
+ }
+
+ public XMLQuery queryProductServer(XMLQuery query, String serverID)
+ throws QueryException {
+ if (query == null)
+ throw new IllegalArgumentException("Query required");
+ if (serverID == null)
+ throw new IllegalArgumentException("Server ID required");
+ try {
+ System.err.println("=== Constructing ProductClient for server "
+ + serverID);
+ ProductClient pc = new ProductClient(serverID);
+ System.err.println("Constructed, calling query with expr = "
+ + query.getKwdQueryString());
+ XMLQuery r = pc.query(query);
+ System.err.println("Called; returning " + r.getResults().size()
+ + " result(s)");
+ return r;
+ } catch (ProductException ex) {
+ System.err.println("--- Exception!");
+ ex.printStackTrace();
+ throw new QueryException(ex);
+ }
+ }
+
+ public XMLQuery getQuery() {
+ return query;
+ }
+
+ public byte[] retrieveChunk(String productID, long offset, int length,
+ String serverID) throws QueryException {
+ if (productID == null)
+ throw new IllegalArgumentException("Product ID required");
+ if (serverID == null)
+ throw new IllegalArgumentException("Server ID required");
+ if (offset < 0)
+ throw new IllegalArgumentException("Nonnegative offset required");
+ if (length <= 0)
+ throw new IllegalArgumentException("Positive size required");
+ try {
+ ProductClient pc = new ProductClient(serverID);
+ return pc.retrieveChunk(productID, offset, length);
+ } catch (ProductException ex) {
+ throw new QueryException(ex);
+ }
+ }
+
+ public void close(String productID, String serverID) throws QueryException {
+ if (productID == null)
+ throw new IllegalArgumentException("Product ID required");
+ if (serverID == null)
+ throw new IllegalArgumentException("Server ID required");
+ try {
+ ProductClient pc = new ProductClient(serverID);
+ pc.close(productID);
+ } catch (ProductException ex) {
+ throw new QueryException(ex);
+ }
+ }
+
+ /**
+ * Return the list of default profile servers.
+ *
+ * The list of default profile servers comes from the comma-separated list of
+ * names in the <code>jpl.eda.query.profileServers</code> system property. If
+ * undefined, it defaults to <code>urn:eda:rmi:JP.Profile</code>.
+ *
+ * @return a {@link List} of {@link String} profile server IDs.
+ */
+ private static List getDefaultProfileServerIDs() {
+ while (defaultServerIDs == null)
+ synchronized (QueryEngine.class) {
+ if (defaultServerIDs == null) {
+ defaultServerIDs = new ArrayList();
+ String values = System.getProperty("jpl.eda.query.profileServers",
+ "urn:eda:rmi:JPL.Profile");
+ for (Iterator i = jpl.eda.util.Utility.parseCommaList(values); i
+ .hasNext();)
+ defaultServerIDs.add(i.next());
+ }
+ }
+ return defaultServerIDs;
+ }
+
+ /**
+ * Launch a new profile querier.
+ *
+ * This method avoids launching a querier at servers that have been or are
+ * currently being queried.
+ *
+ * @param queriers
+ * List of current queriers.
+ * @param queriedServers
+ * Set of {@link String} server IDs queried.
+ * @param query
+ * Query.
+ * @param serverID
+ * What server to query.
+ */
+ private static void launchQuerier(LinkedList queriers, Set queriedServers,
+ XMLQuery query, String serverID) {
+ Querier q;
+ synchronized (queriedServers) {
+ if (queriedServers.contains(serverID))
+ return;
+ queriedServers.add(serverID);
+ q = new Querier(queriers, queriedServers, query, serverID);
+ queriers.addLast(q);
+ }
+ q.start();
+ }
+
+ /** List of {@link String} profile server IDs to query by default. */
+ private static List defaultServerIDs;
+
+ /** Query to use. */
+ private XMLQuery query;
+
+ /**
+ * A querier is a thread that runs a query at a profile server.
+ */
+ private static class Querier extends Thread {
+ /**
+ * Creates a new <code>Querier</code> instance.
+ *
+ * @param queriers
+ * List of other queriers.
+ * @param queriedServers
+ * Set of {@link String} server IDs being queried.
+ * @param query
+ * Query.
+ * @param serverID
+ * Server to query.
+ */
+ Querier(LinkedList queriers, Set queriedServers, XMLQuery query,
+ String serverID) {
+ super("Querying " + serverID + " for " + query.getKwdQueryString());
+ this.queriers = queriers;
+ this.queriedServers = queriedServers;
+ this.query = query;
+ this.serverID = serverID;
+ }
+
+ /**
+ * Get any exception from running this query.
+ *
+ * This method may be called only after this thread terminates.
+ *
+ * @return a {@link QueryException} value.
+ */
+ public QueryException getException() {
+ if (isAlive())
+ throw new IllegalStateException("Join thread first");
+ return queryException;
+ }
+
+ /**
+ * Get any results of this query.
+ *
+ * This method may be called only after this thread terminates.
+ *
+ * @return a {@link List} of {@link Profile}s.
+ */
+ public List getResults() {
+ if (isAlive())
+ throw new IllegalStateException("Join thread first");
+ return results;
+ }
+
+ /**
+ * Start querying, adding new queriers for any new profile servers
+ * discovered.
+ */
+ public void run() {
+ try {
+ ProfileClient pc = new ProfileClient(serverID);
+
+ long time = System.currentTimeMillis();
+ results = pc.query(query);
+ long searchTime = System.currentTimeMillis() - time; // for performance
+ // evaluation
+ List searchTimeList;
+ synchronized (searchTimeList = query.getStatistics()) {
+ Statistic sta = new Statistic(serverID, searchTime);
+ searchTimeList.add(sta);
+ }
+
+ for (Iterator i = results.iterator(); i.hasNext();) {
+ Profile p = (Profile) i.next();
+ if ("system.profileServer".equals(p.getResourceAttributes()
+ .getResClass())) {
+ i.remove();
+ for (Iterator j = p.getResourceAttributes().getResLocations()
+ .iterator(); j.hasNext();) {
+ String otherServerID = (String) j.next();
+ launchQuerier(queriers, queriedServers, query, otherServerID);
+ }
+ }
+ }
+ } catch (ProfileException ex) {
+ queryException = new QueryException(ex);
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (!(obj instanceof Querier))
+ return false;
+ Querier rhs = (Querier) obj;
+ return serverID.equals(rhs.serverID);
+ }
+
+ /** Queriers currently running. */
+ private LinkedList queriers;
+
+ /** List of {@link Profile}s found at this server. */
+ private List results;
+
+ /** If nonnull, any exception generated as a result of running the query. */
+ private QueryException queryException;
+
+ /** Servers queried so far. */
+ private Set queriedServers;
+
+ /** Server I'm querying. */
+ private String serverID;
+
+ /** Query to use. */
+ private XMLQuery query;
+ }
+}
Propchange: incubator/oodt/trunk/query/src/main/java/org/apache/oodt/QueryEngine.java
------------------------------------------------------------------------------
svn:eol-style = native