You are viewing a plain text version of this content. The canonical link for it is here.
Posted to olio-commits@incubator.apache.org by sh...@apache.org on 2009/03/18 22:44:04 UTC
svn commit: r755764 - in
/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader:
./ framework/
Author: shanti
Date: Wed Mar 18 22:44:03 2009
New Revision: 755764
URL: http://svn.apache.org/viewvc?rev=755764&view=rev
Log:
Fix for OLIO-60. This completes the syncing of the rails loader code with the php one.
Added:
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java
Modified:
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java
incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java Wed Mar 18 22:44:03 2009
@@ -32,11 +32,8 @@
return "truncate table events_users";
}
- public Attendees(int eventId) {
- this.eventId = ++eventId;
- }
-
public void prepare() {
+ eventId = getSequence() + 1;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
int attendees = r.random(10, 100);
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java Wed Mar 18 22:44:03 2009
@@ -32,15 +32,13 @@
int[] ratings;
Date createdTimestamp;
- public Comments(int eventId) {
- this.eventId = ++eventId;
- }
public String getClearStatement() {
return "truncate table comments";
}
public void prepare() {
+ eventId = getSequence() + 1;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
int commentCount = r.random(0, 20);
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java Wed Mar 18 22:44:03 2009
@@ -26,15 +26,13 @@
int eventId;
- public Documents(int eventId) {
- this.eventId = ++eventId;
- }
-
public String getClearStatement() {
return "truncate table documents";
}
public void prepare() {
+ eventId = getSequence();
+ ++eventId;
}
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java Wed Mar 18 22:44:03 2009
@@ -29,15 +29,12 @@
int eventId;
int [] tagIds;
- public EventTag(int eventId) {
- this.eventId = ++eventId;
- }
-
public String getClearStatement() {
return "truncate table taggings";
}
public void prepare() {
+ eventId = getSequence() + 1;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
int numTags = r.random(1, 7); // Avg is 4 tags per event
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java Wed Mar 18 22:44:03 2009
@@ -27,15 +27,12 @@
int id;
int[] friends;
- public Friends(int id) {
- this.id = ++id;
- }
-
public String getClearStatement() {
return "truncate table invites";
}
public void prepare() {
+ id = getSequence() + 1;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
int count = r.random(2, 28);
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java Wed Mar 18 22:44:03 2009
@@ -15,7 +15,7 @@
/**
* Comments Loader.
*/
-public class Images extends Loadable {
+public abstract class Images extends Loadable {
// We use on average of 10 comments per event. Random 0..20 comments..
private static final String STATEMENT = "insert into images " +
@@ -24,20 +24,14 @@
static Logger logger = Logger.getLogger(Comments.class.getName());
- int eventId;
+ int imageId;
String prefix;
- public Images(int eventId, String prefix) {
- this.eventId = ++eventId;
- this.prefix = prefix;
- }
-
public String getClearStatement() {
return "truncate table images";
}
- public void prepare() {
- }
+ public abstract void prepare();
public void load() {
@@ -46,11 +40,11 @@
PreparedStatement s = c.prepareStatement(STATEMENT);
s.setInt(1, 671614);
s.setString(2, "application/jpg");
- s.setString(3, prefix + eventId + ".jpg");
+ s.setString(3, prefix + imageId + ".jpg");
s.setInt(4, 1280);
s.setInt(5, 960);
- s.setString(6, prefix + eventId + "t.jpg");
- s.setInt(7, this.eventId);
+ s.setString(6, prefix + imageId + "t.jpg");
+ s.setInt(7, this.imageId);
c.addBatch();
} catch (SQLException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: LoadController.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
package org.apache.olio.workload.loader;
import org.apache.olio.workload.util.ScaleFactors;
@@ -16,44 +35,36 @@
ScaleFactors.setActiveUsers(Integer.parseInt(args[2]));
// Clear the database
- logger.info("Clearing database tables.");
- clear(new Person(0, 0));
- clear(new Friends(0));
- clear(new Address());
- clear(new Tag(0));
- clear(new SocialEvent(0));
- clear(new EventTag(0));
- clear(new Attendees(0));
- clear(new Comments(0));
- clear(new Documents(0));
- clear(new Images(0, ""));
+ clear(Person.class);
+ clear(Friends.class);
+ clear(Address.class);
+ clear(Tag.class);
+ clear(SocialEvent.class);
+ clear(EventTag.class);
+ clear(Attendees.class);
+ clear(Comments.class);
+ clear(Documents.class);
+ clear(PersonImages.class);
+ clear(EventImages.class);
+ logger.info("Done clearing database tables.");
// load person, friends, and addresses
- logger.info("Creating persons, friends, and addresses.");
- for (int i = 0; i < ScaleFactors.users; i++) {
- int imageId = ScaleFactors.events + i + 1;
- load(new Person(i, imageId));
- load(new Friends(i));
- load(new Address());
- load(new Images(imageId, "p"));
- }
+ load(Person.class, ScaleFactors.users);
+ load(Friends.class, ScaleFactors.users);
+ load(Address.class, ScaleFactors.users);
+ // load(new Images(imageId, "p"));
+ load(PersonImages.class, ScaleFactors.users);
// load tags
- logger.info("Creating tags.");
- for (int i = 0; i < ScaleFactors.tagCount; i++) {
- load(new Tag(i));
- }
+ load(Tag.class, ScaleFactors.tagCount);
// load events and all relationships to events
- logger.info("Creating events, attendees, comments.");
- for (int i = 0; i < ScaleFactors.events; i++) {
- load(new SocialEvent(i));
- load(new Documents(i));
- load(new Images(i, "e"));
- load(new EventTag(i));
- load(new Attendees(i));
- load(new Comments(i));
- }
+ load(SocialEvent.class, ScaleFactors.events);
+ load(EventTag.class, ScaleFactors.events);
+ load(Attendees.class, ScaleFactors.events);
+ load(Comments.class, ScaleFactors.events);
+ load(EventImages.class, ScaleFactors.events);
+ load(Documents.class, ScaleFactors.events);
waitProcessing();
logger.info("Done data creation.");
@@ -64,7 +75,7 @@
// We use a new set of connections and thread pools for postLoad.
// This is to ensure all load tasks are done before this one starts.
- postLoad(new Tag(0));
+ postLoad(Tag.class);
shutdown();
logger.info("Done post-load.");
System.exit(0); // Signal successful loading.
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java Wed Mar 18 22:44:03 2009
@@ -29,16 +29,15 @@
String[] fields = new String[10];
int addressId, thumbnail, imageId;
- public Person(int id, int imageId) {
- this.id = ++id;
- this.imageId = imageId;
- }
public String getClearStatement() {
return "truncate table users";
}
public void prepare() {
+ id = getSequence();
+ ++id;
+ imageId = ScaleFactors.events + id;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
StringBuilder b = tr.getBuffer();
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java Wed Mar 18 22:44:03 2009
@@ -38,15 +38,13 @@
Date createdTimestamp;
int[] ifields = new int[7];
- public SocialEvent(int id) {
- this.id = ++id;
- }
public String getClearStatement() {
return "truncate table events";
}
public void prepare() {
+ id = getSequence() + 1;
ThreadResource tr = ThreadResource.getInstance();
Random r = tr.getRandom();
StringBuilder buffer = tr.getBuffer();
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java Wed Mar 18 22:44:03 2009
@@ -25,15 +25,13 @@
int id;
String tag;
- public Tag(int id) {
- this.id = ++id;
- }
public String getClearStatement() {
return "truncate table tags";
}
public void prepare() {
+ id = getSequence() + 1;
tag = UserName.getUserName(id);
}
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java Wed Mar 18 22:44:03 2009
@@ -1,8 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: Loadable.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
package org.apache.olio.workload.loader.framework;
public abstract class Loadable {
- protected Loader loader = Loader.getInstance(getClass().getName());
+ // Sequence is set by the pool.
+ int sequence;
+
+ protected Loader loader = Loader.getInstance(getClass());
+ LoadablePool<? extends Loadable> pool;
+
+ /**
+ * Obtains the sequence, starting from 0, of this loader.
+ *
+ * @return The sequence of this loadable.
+ */
+ protected int getSequence() {
+ return sequence;
+ }
public abstract String getClearStatement();
Added: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java?rev=755764&view=auto
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java (added)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java Wed Mar 18 22:44:03 2009
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id$
+ */
+package org.apache.olio.workload.loader.framework;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author akara
+ */
+public class LoadablePool<T extends Loadable> {
+
+ private static Logger logger = Logger.getLogger(LoadablePool.class.getName());
+ LinkedBlockingDeque<T> pool = new LinkedBlockingDeque<T>();
+ int sequence;
+ int count = 0;
+ int size;
+ Class<T> clazz;
+
+ public LoadablePool(int size, Class<T> clazz) {
+ this.size = size;
+ this.clazz = clazz;
+ }
+
+ public T getLoadable() throws Exception {
+ T loadable = pool.poll();
+ if (loadable == null) {
+ if (count < size) {
+ loadable = clazz.newInstance();
+ loadable.pool = this;
+ ++count;
+ } else {
+ for (;;) {
+ try {
+ loadable = pool.take();
+ break;
+ } catch (InterruptedException ex) {
+ logger.log(Level.WARNING, "getLoader interrupted", ex);
+ }
+ }
+ }
+ }
+ loadable.sequence = sequence++;
+ return loadable;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void putLoader(Loadable loadable) {
+ for (;;) {
+ try {
+ // User a LIFO model to keep the hot objects in cache.
+ pool.putFirst((T) loadable);
+ break;
+ } catch (InterruptedException ex) {
+ logger.log(Level.WARNING, "putLoader interrupted!", ex);
+ }
+ }
+ }
+}
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: Loader.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
package org.apache.olio.workload.loader.framework;
import java.sql.SQLException;
@@ -5,6 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.ArrayList;
/**
* The loader, one instance per Loadable type loaded, is responsible
@@ -19,8 +39,11 @@
/** The batch size of a single batch. */
public static final int BATCHSIZE = 1000;
+ /** The recycling pool size is 3 times the size of the batch. */
+ public static final int POOLSIZE = 3 * BATCHSIZE;
+
/** The number of errors before exiting. */
- public static final int ERROR_THRESHOLD = 100;
+ public static final int ERROR_THRESHOLD = 50;
public static final int LOAD_THREADS = 5;
@@ -32,6 +55,11 @@
private static ConcurrentHashMap<String, Loader> typeMap =
new ConcurrentHashMap<String, Loader>();
+ private static ConcurrentHashMap<Class, LoadablePool> poolMap =
+ new ConcurrentHashMap<Class, LoadablePool>();
+
+ private static ArrayList<Thread> mainLoaders = new ArrayList<Thread>();
+
// This is a single processing pool for processing data preps.
private static ExecutorService processor =
Executors.newCachedThreadPool();
@@ -39,19 +67,22 @@
private String name;
AtomicInteger loadCount;
+ LoadablePool<? extends Loadable> loadablePool;
+
// A Loadable type database loading pool.
ExecutorService pool;
ConcurrentLinkedQueue<Loadable> queue;
/**
* Obtains the instance of the loader for a given loadable type.
- * @param name The loadable type name
+ * @param clazz The loadable type
* @return The loader for this type name, or a new loader if none exists
*/
- static Loader getInstance(String name) {
+ static Loader getInstance(Class<? extends Loadable> clazz) {
// We may need to change this to a configurable thread pool size
// on a per-type basis. This is the only place to change.
+ String name = clazz.getName();
Loader loader = new Loader();
Loader oldEntry = typeMap.putIfAbsent(name, loader);
@@ -77,6 +108,19 @@
// pool = Executors.newCachedThreadPool();
}
+ private static <T extends Loadable> LoadablePool<T>
+ getLoadablePool(Class<T> clazz) {
+ LoadablePool<T> pool = new LoadablePool<T>(3 * BATCHSIZE, clazz);
+ @SuppressWarnings("unchecked")
+ LoadablePool<T> oldEntry = poolMap.putIfAbsent(clazz, pool);
+
+ if (oldEntry != null) {
+ pool = oldEntry;
+ }
+
+ return pool;
+ }
+
/**
* Sets the URL for the connection to the database.
* @param url The connection URL
@@ -95,10 +139,21 @@
/**
* Uses the loadable to clear the database through the loadable's
* clear statement.
- * @param l The loadable to use
+ * @param clazz The loadable class to use
*/
- public static void clear(final Loadable l) {
+ public static void clear(Class<? extends Loadable> clazz) {
+ Loadable loadable = null;
+ try {
+ loadable = clazz.newInstance();
+ } catch (Exception ex) {
+ logger.log(Level.SEVERE, "Error instantiating loader class.", ex);
+ increaseErrorCount();
+ }
+
+ if (loadable != null) {
+ final Loadable l = loadable;
Future f = l.loader.pool.submit(new Runnable() {
+
public void run() {
ThreadConnection c = ThreadConnection.getInstance();
try {
@@ -115,11 +170,12 @@
try {
Thread.sleep(200);
} catch (InterruptedException e) {
- logger.log(Level.WARNING, l.loader.name + ": Interrupted while " +
- "waiting to clear table.", e);
+ logger.log(Level.WARNING, l.loader.name + ": Interrupted " +
+ "while waiting to clear table.", e);
}
}
}
+ }
/**
* Loads the loadable into the database. Note that the loading is done
@@ -129,10 +185,29 @@
* will gracefully shut down the processing infrastructure and wait until
* all preparation is done. Shutdown will wait until all data loading
* is done.
- * @param l
+ * @param clazz The loadable class
+ * @param occurrences The number of load iterations
*/
- public static void load(final Loadable l) {
+ public static void load(Class<? extends Loadable> clazz, int occurrences) {
+
+ final Class<? extends Loadable> c = clazz;
+ final int occ = occurrences;
+ Thread mainLoader = new Thread() {
+
+ @Override
+ public void run() {
+ for (int i = 0; i < occ; i++) {
+ Loadable loadable = null;
+ try {
+ loadable = getLoadablePool(c).getLoadable();
+ } catch (Exception ex) {
+ logger.log(Level.SEVERE, "Error obtaining loadable", ex);
+ increaseErrorCount();
+ }
+ if (loadable != null) {
+ final Loadable l = loadable;
processor.execute(new Runnable() {
+
public void run() {
try {
l.prepare();
@@ -144,13 +219,35 @@
}
});
}
+ }
+ }
+ };
+ mainLoaders.add(mainLoader);
+ mainLoader.start();
+ }
+
+ public static void exec(Runnable r) {
+ processor.execute(r);
+ }
/**
* Execute the post loads provided by the loadable.
- * @param l The loadable.
+ * @param clazz The loadable class
*/
- public static void postLoad(final Loadable l) {
+ public static void postLoad(Class<? extends Loadable> clazz) {
+ Loadable loadable = null;
+ try {
+ loadable = clazz.newInstance();
+ } catch (Exception ex) {
+ logger.log(Level.SEVERE, "Error instantiating loader class.", ex);
+ increaseErrorCount();
+ }
+
+ if (loadable != null) {
+
+ final Loadable l = loadable;
l.loader.pool.submit(new Runnable() {
+
public void run() {
try {
l.postLoad();
@@ -162,6 +259,7 @@
}
});
}
+ }
private void add(Loadable l) {
@@ -185,6 +283,16 @@
* preparation is done.
*/
public static void waitProcessing() {
+ // Wait for the main loaders
+ for (Thread mainLoader : mainLoaders) {
+ for (;;)
+ try {
+ mainLoader.join();
+ break;
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ }
+ }
// We ensure the process pool is cleared, first.
if (processor != null) {
processor.shutdown();
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ThreadConnection.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
package org.apache.olio.workload.loader.framework;
import java.sql.*;
@@ -24,7 +43,7 @@
};
private static boolean COMMIT_TX = Boolean.parseBoolean(
- System.getProperty("commit.tx", "true"));
+ System.getProperty("commit.tx", "true"));
private static final List<ThreadConnection> CONNECTIONLIST =
Collections.synchronizedList(new ArrayList<ThreadConnection>());
@@ -63,7 +82,6 @@
try {
if (conn == null || conn.isClosed()) {
conn = DriverManager.getConnection(connectionURL);
- //conn.setAutoCommit(false);
statement = null;
statementText = null;
}
@@ -118,7 +136,8 @@
}
}
- void processBatch(String name, int batchCount, Queue<Loadable> queue) {
+ void processBatch(String name, int batchCount,
+ Queue<? extends Loadable> queue) {
// First we need to save the load objects from the queue
// so we do not loose them in case we need to retry.
if (batchBuffer == null) {
@@ -148,7 +167,7 @@
batchName = "final " + count + " object batch.";
int flushed = 0;
- for (int retry = 0; retry < 10; retry++) {
+ for (int retry = 0; retry < 2; retry++) {
try {
for (int i = flushed; i < count; i++) {
batchBuffer[i].load();
@@ -175,7 +194,7 @@
logger.fine(name + ": Loaded " + batchName);
break; // We won't retry if everything is OK.
} catch (BatchUpdateException e) {
- if (retry < 10) {
+ if (retry == 0) {
resetConnection();
logger.log(Level.WARNING, name +
": Retry loading.", e);
@@ -205,11 +224,12 @@
}
// Once we're done with this buffer, don't hold on to the objects.
- // Let them get GC'd so we don't bloat memory. Minimal CPU cost
- // for such tight loop and setting all entries to null.
- for (int i = 0; i < batchBuffer.length; i++)
+ // Return them to the pool so we don't bloat memory.
+ for (int i = 0; i < batchBuffer.length; i++) {
+ batchBuffer[i].pool.putLoader(batchBuffer[i]);
batchBuffer[i] = null;
}
+ }
void flush() throws SQLException {
statement.executeBatch();
@@ -227,7 +247,6 @@
}
static void closeConnections() {
- System.out.println("Closing connection");
synchronized (CONNECTIONLIST) {
for (ThreadConnection c : CONNECTIONLIST)
try {
Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ThreadResource.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
package org.apache.olio.workload.loader.framework;
import com.sun.faban.driver.util.Random;