You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by mi...@apache.org on 2011/06/20 17:09:37 UTC
svn commit: r1137651 -
/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
Author: mikedd
Date: Mon Jun 20 15:09:37 2011
New Revision: 1137651
URL: http://svn.apache.org/viewvc?rev=1137651&view=rev
Log:
OPENJPA-2008: Setting svn eol-style for DistributedSQLStoreQuery
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java (contents, props changed)
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java?rev=1137651&r1=1137650&r2=1137651&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java Mon Jun 20 15:09:37 2011
@@ -1,293 +1,293 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.openjpa.slice.jdbc;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.openjpa.jdbc.kernel.JDBCStore;
-import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
-import org.apache.openjpa.jdbc.kernel.SQLStoreQuery;
-import org.apache.openjpa.kernel.AbstractStoreQuery;
-import org.apache.openjpa.kernel.BrokerImpl;
-import org.apache.openjpa.kernel.ExpressionStoreQuery;
-import org.apache.openjpa.kernel.FetchConfiguration;
-import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
-import org.apache.openjpa.kernel.QueryContext;
-import org.apache.openjpa.kernel.QueryImpl;
-import org.apache.openjpa.kernel.StoreManager;
-import org.apache.openjpa.kernel.StoreQuery;
-import org.apache.openjpa.kernel.exps.ExpressionParser;
-import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
-import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
-import org.apache.openjpa.lib.rop.ResultObjectProvider;
-import org.apache.openjpa.meta.ClassMetaData;
-import org.apache.openjpa.slice.DistributedConfiguration;
-import org.apache.openjpa.slice.SliceThread;
-import org.apache.openjpa.util.StoreException;
-
-/**
- * A query for distributed databases.
- *
- * @author Pinaki Poddar
- *
- */
-@SuppressWarnings("serial")
-class DistributedSQLStoreQuery extends SQLStoreQuery {
- private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
-
- public DistributedSQLStoreQuery(JDBCStore store) {
- super(store);
- }
-
- void add(StoreQuery q) {
- _queries.add(q);
- }
-
- public DistributedJDBCStoreManager getDistributedStore() {
- return (DistributedJDBCStoreManager) getStore();
- }
-
- public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
- boolean parallel = !getContext().getStoreContext().getBroker()
- .getMultithreaded();
- ParallelExecutor ex = new ParallelExecutor(this, meta, parallel);
- for (StoreQuery q : _queries) {
- ex.addExecutor(q.newDataStoreExecutor(meta, subs));
- }
- return ex;
- }
-
- public void setContext(QueryContext ctx) {
- super.setContext(ctx);
- for (StoreQuery q : _queries)
- q.setContext(ctx);
- }
-
- /**
- * Executes queries on multiple databases.
- *
- * @author Pinaki Poddar
- *
- */
- public static class ParallelExecutor extends
- SQLStoreQuery.SQLExecutor {
- private List<Executor> executors = new ArrayList<Executor>();
- private DistributedSQLStoreQuery owner = null;
-
- public ParallelExecutor(DistributedSQLStoreQuery dsq, ClassMetaData meta, boolean p) {
- super(dsq, meta);
- owner = dsq;
- }
-
- public void addExecutor(Executor ex) {
- executors.add(ex);
- }
-
- /**
- * Each child query must be executed with slice context and not the
- * given query context.
- */
- public ResultObjectProvider executeQuery(StoreQuery q,
- final Object[] params, final Range range) {
- List<Future<ResultObjectProvider>> futures =
- new ArrayList<Future<ResultObjectProvider>>();
- final List<Executor> usedExecutors = new ArrayList<Executor>();
- final List<ResultObjectProvider> rops =
- new ArrayList<ResultObjectProvider>();
- List<SliceStoreManager> targets = findTargets();
- QueryContext ctx = q.getContext();
- boolean isReplicated = containsReplicated(ctx);
- ExecutorService threadPool = SliceThread.getPool();
- for (int i = 0; i < owner._queries.size(); i++) {
- // if replicated, then execute only on single slice
- if (isReplicated && !usedExecutors.isEmpty()) {
- break;
- }
- StoreManager sm = owner.getDistributedStore().getSlice(i);
- if (!targets.contains(sm))
- continue;
- StoreQuery query = owner._queries.get(i);
- Executor executor = executors.get(i);
- if (!targets.contains(sm))
- continue;
- usedExecutors.add(executor);
- QueryExecutor call = new QueryExecutor();
- call.executor = executor;
- call.query = query;
- call.params = params;
- call.range = range;
- futures.add(threadPool.submit(call));
- }
- for (Future<ResultObjectProvider> future : futures) {
- try {
- rops.add(future.get());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
-
- ResultObjectProvider[] tmp = rops
- .toArray(new ResultObjectProvider[rops.size()]);
- ResultObjectProvider result = null;
- boolean[] ascending = getAscending(q);
- boolean isAscending = ascending.length > 0;
- boolean isAggregate = ctx.isAggregate();
- boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
- if (isAggregate) {
- result = new UniqueResultObjectProvider(tmp, q,
- getQueryExpressions());
- } else if (isAscending) {
- result = new OrderingMergedResultObjectProvider(tmp, ascending,
- usedExecutors.toArray(new Executor[usedExecutors.size()]),
- q, params);
- } else {
- result = new MergedResultObjectProvider(tmp);
- }
- if (hasRange) {
- result = new RangeResultObjectProvider(result,
- ctx.getStartRange(), ctx.getEndRange());
- }
- return result;
- }
-
- /**
- * Scans metadata to find out if a replicated class is the candidate.
- */
- boolean containsReplicated(QueryContext query) {
- Class<?> candidate = query.getCandidateType();
- DistributedConfiguration conf = (DistributedConfiguration)query.getStoreContext()
- .getConfiguration();
- if (candidate != null) {
- return conf.isReplicated(candidate);
- }
- ClassMetaData[] metas = query.getAccessPathMetaDatas();
- if (metas == null || metas.length < 1)
- return false;
- for (ClassMetaData meta : metas)
- if (conf.isReplicated(meta.getDescribedType()))
- return true;
- return false;
- }
-
- public Number executeDelete(StoreQuery q, Object[] params) {
- Iterator<StoreQuery> qs = owner._queries.iterator();
- List<Future<Number>> futures = null;
- int result = 0;
- ExecutorService threadPool = SliceThread.getPool();
- for (Executor ex : executors) {
- if (futures == null)
- futures = new ArrayList<Future<Number>>();
- DeleteExecutor call = new DeleteExecutor();
- call.executor = ex;
- call.query = qs.next();
- call.params = params;
- futures.add(threadPool.submit(call));
- }
- for (Future<Number> future : futures) {
- try {
- Number n = future.get();
- if (n != null)
- result += n.intValue();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
- return result;
- }
-
- public Number executeUpdate(StoreQuery q, Object[] params) {
- Iterator<StoreQuery> qs = owner._queries.iterator();
- List<Future<Number>> futures = null;
- int result = 0;
- ExecutorService threadPool = SliceThread.getPool();
- for (Executor ex : executors) {
- if (futures == null)
- futures = new ArrayList<Future<Number>>();
- UpdateExecutor call = new UpdateExecutor();
- call.executor = ex;
- call.query = qs.next();
- call.params = params;
- futures.add(threadPool.submit(call));
- }
- for (Future<Number> future : futures) {
- try {
- Number n = future.get();
- result += (n == null) ? 0 : n.intValue();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
- return result;
- }
-
- List<SliceStoreManager> findTargets() {
- FetchConfiguration fetch = owner.getContext()
- .getFetchConfiguration();
- return owner.getDistributedStore().getTargets(fetch);
- }
-
- }
-
- static class QueryExecutor implements Callable<ResultObjectProvider> {
- StoreQuery query;
- Executor executor;
- Object[] params;
- Range range;
-
- public ResultObjectProvider call() throws Exception {
- return executor.executeQuery(query, params, range);
- }
- }
-
- static class DeleteExecutor implements Callable<Number> {
- StoreQuery query;
- Executor executor;
- Object[] params;
-
- public Number call() throws Exception {
- return executor.executeDelete(query, params);
- }
- }
-
- static class UpdateExecutor implements Callable<Number> {
- StoreQuery query;
- Executor executor;
- Object[] params;
-
- public Number call() throws Exception {
- return executor.executeUpdate(query, params);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.jdbc.kernel.SQLStoreQuery;
+import org.apache.openjpa.kernel.AbstractStoreQuery;
+import org.apache.openjpa.kernel.BrokerImpl;
+import org.apache.openjpa.kernel.ExpressionStoreQuery;
+import org.apache.openjpa.kernel.FetchConfiguration;
+import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
+import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreManager;
+import org.apache.openjpa.kernel.StoreQuery;
+import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
+import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
+import org.apache.openjpa.lib.rop.ResultObjectProvider;
+import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.slice.DistributedConfiguration;
+import org.apache.openjpa.slice.SliceThread;
+import org.apache.openjpa.util.StoreException;
+
+/**
+ * A query for distributed databases.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+@SuppressWarnings("serial")
+class DistributedSQLStoreQuery extends SQLStoreQuery {
+ private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
+
+ public DistributedSQLStoreQuery(JDBCStore store) {
+ super(store);
+ }
+
+ void add(StoreQuery q) {
+ _queries.add(q);
+ }
+
+ public DistributedJDBCStoreManager getDistributedStore() {
+ return (DistributedJDBCStoreManager) getStore();
+ }
+
+ public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+ boolean parallel = !getContext().getStoreContext().getBroker()
+ .getMultithreaded();
+ ParallelExecutor ex = new ParallelExecutor(this, meta, parallel);
+ for (StoreQuery q : _queries) {
+ ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+ }
+ return ex;
+ }
+
+ public void setContext(QueryContext ctx) {
+ super.setContext(ctx);
+ for (StoreQuery q : _queries)
+ q.setContext(ctx);
+ }
+
+ /**
+ * Executes queries on multiple databases.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+ public static class ParallelExecutor extends
+ SQLStoreQuery.SQLExecutor {
+ private List<Executor> executors = new ArrayList<Executor>();
+ private DistributedSQLStoreQuery owner = null;
+
+ public ParallelExecutor(DistributedSQLStoreQuery dsq, ClassMetaData meta, boolean p) {
+ super(dsq, meta);
+ owner = dsq;
+ }
+
+ public void addExecutor(Executor ex) {
+ executors.add(ex);
+ }
+
+ /**
+ * Each child query must be executed with slice context and not the
+ * given query context.
+ */
+ public ResultObjectProvider executeQuery(StoreQuery q,
+ final Object[] params, final Range range) {
+ List<Future<ResultObjectProvider>> futures =
+ new ArrayList<Future<ResultObjectProvider>>();
+ final List<Executor> usedExecutors = new ArrayList<Executor>();
+ final List<ResultObjectProvider> rops =
+ new ArrayList<ResultObjectProvider>();
+ List<SliceStoreManager> targets = findTargets();
+ QueryContext ctx = q.getContext();
+ boolean isReplicated = containsReplicated(ctx);
+ ExecutorService threadPool = SliceThread.getPool();
+ for (int i = 0; i < owner._queries.size(); i++) {
+ // if replicated, then execute only on single slice
+ if (isReplicated && !usedExecutors.isEmpty()) {
+ break;
+ }
+ StoreManager sm = owner.getDistributedStore().getSlice(i);
+ if (!targets.contains(sm))
+ continue;
+ StoreQuery query = owner._queries.get(i);
+ Executor executor = executors.get(i);
+ if (!targets.contains(sm))
+ continue;
+ usedExecutors.add(executor);
+ QueryExecutor call = new QueryExecutor();
+ call.executor = executor;
+ call.query = query;
+ call.params = params;
+ call.range = range;
+ futures.add(threadPool.submit(call));
+ }
+ for (Future<ResultObjectProvider> future : futures) {
+ try {
+ rops.add(future.get());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
+ }
+
+ ResultObjectProvider[] tmp = rops
+ .toArray(new ResultObjectProvider[rops.size()]);
+ ResultObjectProvider result = null;
+ boolean[] ascending = getAscending(q);
+ boolean isAscending = ascending.length > 0;
+ boolean isAggregate = ctx.isAggregate();
+ boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
+ if (isAggregate) {
+ result = new UniqueResultObjectProvider(tmp, q,
+ getQueryExpressions());
+ } else if (isAscending) {
+ result = new OrderingMergedResultObjectProvider(tmp, ascending,
+ usedExecutors.toArray(new Executor[usedExecutors.size()]),
+ q, params);
+ } else {
+ result = new MergedResultObjectProvider(tmp);
+ }
+ if (hasRange) {
+ result = new RangeResultObjectProvider(result,
+ ctx.getStartRange(), ctx.getEndRange());
+ }
+ return result;
+ }
+
+ /**
+ * Scans metadata to find out if a replicated class is the candidate.
+ */
+ boolean containsReplicated(QueryContext query) {
+ Class<?> candidate = query.getCandidateType();
+ DistributedConfiguration conf = (DistributedConfiguration)query.getStoreContext()
+ .getConfiguration();
+ if (candidate != null) {
+ return conf.isReplicated(candidate);
+ }
+ ClassMetaData[] metas = query.getAccessPathMetaDatas();
+ if (metas == null || metas.length < 1)
+ return false;
+ for (ClassMetaData meta : metas)
+ if (conf.isReplicated(meta.getDescribedType()))
+ return true;
+ return false;
+ }
+
+ public Number executeDelete(StoreQuery q, Object[] params) {
+ Iterator<StoreQuery> qs = owner._queries.iterator();
+ List<Future<Number>> futures = null;
+ int result = 0;
+ ExecutorService threadPool = SliceThread.getPool();
+ for (Executor ex : executors) {
+ if (futures == null)
+ futures = new ArrayList<Future<Number>>();
+ DeleteExecutor call = new DeleteExecutor();
+ call.executor = ex;
+ call.query = qs.next();
+ call.params = params;
+ futures.add(threadPool.submit(call));
+ }
+ for (Future<Number> future : futures) {
+ try {
+ Number n = future.get();
+ if (n != null)
+ result += n.intValue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ public Number executeUpdate(StoreQuery q, Object[] params) {
+ Iterator<StoreQuery> qs = owner._queries.iterator();
+ List<Future<Number>> futures = null;
+ int result = 0;
+ ExecutorService threadPool = SliceThread.getPool();
+ for (Executor ex : executors) {
+ if (futures == null)
+ futures = new ArrayList<Future<Number>>();
+ UpdateExecutor call = new UpdateExecutor();
+ call.executor = ex;
+ call.query = qs.next();
+ call.params = params;
+ futures.add(threadPool.submit(call));
+ }
+ for (Future<Number> future : futures) {
+ try {
+ Number n = future.get();
+ result += (n == null) ? 0 : n.intValue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ List<SliceStoreManager> findTargets() {
+ FetchConfiguration fetch = owner.getContext()
+ .getFetchConfiguration();
+ return owner.getDistributedStore().getTargets(fetch);
+ }
+
+ }
+
+ static class QueryExecutor implements Callable<ResultObjectProvider> {
+ StoreQuery query;
+ Executor executor;
+ Object[] params;
+ Range range;
+
+ public ResultObjectProvider call() throws Exception {
+ return executor.executeQuery(query, params, range);
+ }
+ }
+
+ static class DeleteExecutor implements Callable<Number> {
+ StoreQuery query;
+ Executor executor;
+ Object[] params;
+
+ public Number call() throws Exception {
+ return executor.executeDelete(query, params);
+ }
+ }
+
+ static class UpdateExecutor implements Callable<Number> {
+ StoreQuery query;
+ Executor executor;
+ Object[] params;
+
+ public Number call() throws Exception {
+ return executor.executeUpdate(query, params);
+ }
+ }
+}
Propchange: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
------------------------------------------------------------------------------
svn:eol-style = native