You are viewing a plain text version of this content. The canonical link for it is here.
Posted to olio-commits@incubator.apache.org by sh...@apache.org on 2009/03/18 22:44:04 UTC

svn commit: r755764 - in /incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader: ./ framework/

Author: shanti
Date: Wed Mar 18 22:44:03 2009
New Revision: 755764

URL: http://svn.apache.org/viewvc?rev=755764&view=rev
Log:
Fix for OLIO-60. This completes the syncing of the rails loader code with the php one.

Added:
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java
Modified:
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java
    incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Attendees.java Wed Mar 18 22:44:03 2009
@@ -32,11 +32,8 @@
         return "truncate table events_users";
     }
 
-    public Attendees(int eventId) {
-        this.eventId = ++eventId;
-    }
-
     public void prepare() {
+		eventId = getSequence() + 1;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         int attendees = r.random(10, 100);

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Comments.java Wed Mar 18 22:44:03 2009
@@ -32,15 +32,13 @@
     int[] ratings;
     Date createdTimestamp;
 
-    public Comments(int eventId) {
-        this.eventId = ++eventId;
-    }
 
     public String getClearStatement() {
         return "truncate table comments";
     }
 
     public void prepare() {
+		eventId = getSequence() + 1;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         int commentCount = r.random(0, 20);

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Documents.java Wed Mar 18 22:44:03 2009
@@ -26,15 +26,13 @@
 
     int eventId;
 
-    public Documents(int eventId) {
-        this.eventId = ++eventId;
-    }
-
     public String getClearStatement() {
         return "truncate table documents";
     }
 
     public void prepare() {
+		eventId = getSequence();
+		++eventId;
     }
 
 

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/EventTag.java Wed Mar 18 22:44:03 2009
@@ -29,15 +29,12 @@
     int eventId;
     int [] tagIds;
 
-    public EventTag(int eventId) {
-        this.eventId = ++eventId;
-    }
-
     public String getClearStatement() {
         return "truncate table taggings";
     }
 
     public void prepare() {
+		eventId = getSequence() + 1;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         int numTags = r.random(1, 7); // Avg is 4 tags per event

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Friends.java Wed Mar 18 22:44:03 2009
@@ -27,15 +27,12 @@
     int id;
     int[] friends;
 
-    public Friends(int id) {
-        this.id = ++id;
-    }
-
     public String getClearStatement() {
         return "truncate table invites";
     }
 
     public void prepare() {
+		id = getSequence() + 1;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         int count = r.random(2, 28);

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Images.java Wed Mar 18 22:44:03 2009
@@ -15,7 +15,7 @@
 /**
  * Comments Loader.
  */
-public class Images extends Loadable {
+public abstract class Images extends Loadable {
     // We use on average of 10 comments per event. Random 0..20 comments..
 
     private static final String STATEMENT = "insert into images " +
@@ -24,20 +24,14 @@
 
     static Logger logger = Logger.getLogger(Comments.class.getName());
 
-    int eventId;
+    int imageId;
     String prefix;
 
-    public Images(int eventId, String prefix) {
-        this.eventId = ++eventId;
-        this.prefix = prefix;
-    }
-
     public String getClearStatement() {
         return "truncate table images";
     }
 
-    public void prepare() {
-    }
+    public abstract void prepare();
 
 
     public void load() {
@@ -46,11 +40,11 @@
             PreparedStatement s = c.prepareStatement(STATEMENT);
             s.setInt(1, 671614);
             s.setString(2, "application/jpg");
-            s.setString(3, prefix + eventId + ".jpg");
+            s.setString(3, prefix + imageId + ".jpg");
             s.setInt(4, 1280);
             s.setInt(5, 960);
-            s.setString(6, prefix + eventId + "t.jpg");
-            s.setInt(7, this.eventId);
+            s.setString(6, prefix + imageId + "t.jpg");
+            s.setInt(7, this.imageId);
             c.addBatch();
         } catch (SQLException e) {
             logger.log(Level.SEVERE, e.getMessage(), e);

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/LoadController.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id: LoadController.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
 package org.apache.olio.workload.loader;
 
 import org.apache.olio.workload.util.ScaleFactors;
@@ -16,44 +35,36 @@
         ScaleFactors.setActiveUsers(Integer.parseInt(args[2]));
 
         // Clear the database
-        logger.info("Clearing database tables.");
-        clear(new Person(0, 0));
-        clear(new Friends(0));
-        clear(new Address());
-        clear(new Tag(0));
-        clear(new SocialEvent(0));
-        clear(new EventTag(0));
-        clear(new Attendees(0));
-        clear(new Comments(0));
-        clear(new Documents(0));
-        clear(new Images(0, ""));
+        clear(Person.class);
+        clear(Friends.class);
+        clear(Address.class);
+        clear(Tag.class);
+        clear(SocialEvent.class);
+        clear(EventTag.class);
+        clear(Attendees.class);
+        clear(Comments.class);
+        clear(Documents.class);
+        clear(PersonImages.class);
+        clear(EventImages.class);
+        logger.info("Done clearing database tables.");
 
         // load person, friends, and addresses
-        logger.info("Creating persons, friends, and addresses.");
-        for (int i = 0; i < ScaleFactors.users; i++) {
-            int imageId = ScaleFactors.events + i + 1;
-            load(new Person(i, imageId));
-            load(new Friends(i));
-            load(new Address());
-            load(new Images(imageId, "p"));
-        }
+        load(Person.class, ScaleFactors.users);
+        load(Friends.class, ScaleFactors.users);
+        load(Address.class, ScaleFactors.users);
+		// load(new Images(imageId, "p"));
+        load(PersonImages.class, ScaleFactors.users);
 
         // load tags
-        logger.info("Creating tags.");
-        for (int i = 0; i < ScaleFactors.tagCount; i++) {
-            load(new Tag(i));
-        }
+        load(Tag.class, ScaleFactors.tagCount);
 
         // load events and all relationships to events
-        logger.info("Creating events, attendees, comments.");
-        for (int i = 0; i < ScaleFactors.events; i++) {
-            load(new SocialEvent(i));
-            load(new Documents(i));
-            load(new Images(i, "e"));
-            load(new EventTag(i));
-            load(new Attendees(i));
-            load(new Comments(i));
-        }
+        load(SocialEvent.class, ScaleFactors.events);
+        load(EventTag.class, ScaleFactors.events);
+        load(Attendees.class, ScaleFactors.events);
+        load(Comments.class, ScaleFactors.events);
+        load(EventImages.class, ScaleFactors.events);
+        load(Documents.class, ScaleFactors.events);
 
         waitProcessing();
         logger.info("Done data creation.");
@@ -64,7 +75,7 @@
 
         // We use a new set of connections and thread pools for postLoad.
         // This is to ensure all load tasks are done before this one starts.
-        postLoad(new Tag(0));
+        postLoad(Tag.class);
         shutdown();
         logger.info("Done post-load.");
         System.exit(0); // Signal successful loading.

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Person.java Wed Mar 18 22:44:03 2009
@@ -29,16 +29,15 @@
     String[] fields = new String[10];
     int addressId, thumbnail, imageId;
 
-    public Person(int id, int imageId) {
-        this.id = ++id;
-        this.imageId = imageId;
-    }
 
     public String getClearStatement() {
         return "truncate table users";
     }
 
     public void prepare() {
+		id = getSequence();
+        ++id;
+	    imageId = ScaleFactors.events + id;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         StringBuilder b = tr.getBuffer();

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/SocialEvent.java Wed Mar 18 22:44:03 2009
@@ -38,15 +38,13 @@
     Date createdTimestamp;
     int[] ifields = new int[7];
 
-    public SocialEvent(int id) {
-        this.id = ++id;
-    }
 
     public String getClearStatement() {
         return "truncate table events";
     }
 
     public void prepare() {
+		id = getSequence() + 1;
         ThreadResource tr = ThreadResource.getInstance();
         Random r = tr.getRandom();
         StringBuilder buffer = tr.getBuffer();

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/Tag.java Wed Mar 18 22:44:03 2009
@@ -25,15 +25,13 @@
     int id;
     String tag;
 
-    public Tag(int id) {
-        this.id = ++id;
-    }
 
     public String getClearStatement() {
         return "truncate table tags";
     }
 
     public void prepare() {
+		id = getSequence() + 1;
         tag = UserName.getUserName(id);
     }
 

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loadable.java Wed Mar 18 22:44:03 2009
@@ -1,8 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id: Loadable.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
 package org.apache.olio.workload.loader.framework;
 
 public abstract class Loadable {
 
-    protected Loader loader = Loader.getInstance(getClass().getName());
+    // Sequence is set by the pool.
+    int sequence;
+
+    protected Loader loader = Loader.getInstance(getClass());
+    LoadablePool<? extends Loadable> pool;
+
+    /**
+     * Obtains the sequence, starting from 0, of this loader.
+     *
+     * @return The sequence of this loadable.
+     */
+    protected int getSequence() {
+        return sequence;
+    }
 
     public abstract String getClearStatement();
 

Added: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java?rev=755764&view=auto
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java (added)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/LoadablePool.java Wed Mar 18 22:44:03 2009
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id$
+ */
+package org.apache.olio.workload.loader.framework;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author akara
+ */
+public class LoadablePool<T extends Loadable> {
+
+    private static Logger logger = Logger.getLogger(LoadablePool.class.getName());
+    LinkedBlockingDeque<T> pool = new LinkedBlockingDeque<T>();
+    int sequence;
+    int count = 0;
+    int size;
+    Class<T> clazz;
+
+    public LoadablePool(int size, Class<T> clazz) {
+        this.size = size;
+        this.clazz = clazz;
+    }
+
+    public T getLoadable() throws Exception {
+        T loadable = pool.poll();
+        if (loadable == null) {
+            if (count < size) {
+                loadable = clazz.newInstance();
+                loadable.pool = this;
+                ++count;
+            } else {
+                for (;;) {
+                    try {
+                        loadable = pool.take();
+                        break;
+                    } catch (InterruptedException ex) {
+                        logger.log(Level.WARNING, "getLoader interrupted", ex);
+                    }
+                }
+            }
+        }
+        loadable.sequence = sequence++;
+        return loadable;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void putLoader(Loadable loadable) {
+        for (;;) {
+            try {
+                // User a LIFO model to keep the hot objects in cache.
+                pool.putFirst((T) loadable);
+                break;
+            } catch (InterruptedException ex) {
+                logger.log(Level.WARNING, "putLoader interrupted!", ex);
+            }
+        }
+    }
+}

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/Loader.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id: Loader.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
 package org.apache.olio.workload.loader.framework;
 
 import java.sql.SQLException;
@@ -5,6 +24,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.ArrayList;
 
 /**
  * The loader, one instance per Loadable type loaded, is responsible
@@ -19,8 +39,11 @@
     /** The batch size of a single batch. */
     public static final int BATCHSIZE = 1000;
 
+    /** The recycling pool size is 3 times the size of the batch. */
+    public static final int POOLSIZE = 3 * BATCHSIZE;
+
     /** The number of errors before exiting. */
-    public static final int ERROR_THRESHOLD = 100;
+    public static final int ERROR_THRESHOLD = 50;
 
     public static final int LOAD_THREADS = 5;
 
@@ -32,6 +55,11 @@
     private static ConcurrentHashMap<String, Loader> typeMap =
                             new ConcurrentHashMap<String, Loader>();
 
+    private static ConcurrentHashMap<Class, LoadablePool> poolMap =
+                            new ConcurrentHashMap<Class, LoadablePool>();
+
+    private static ArrayList<Thread> mainLoaders = new ArrayList<Thread>();
+
     // This is a single processing pool for processing data preps.
     private static ExecutorService processor =
                                 Executors.newCachedThreadPool();
@@ -39,19 +67,22 @@
     private String name;
     AtomicInteger loadCount;
 
+    LoadablePool<? extends Loadable> loadablePool;
+
     // A Loadable type database loading pool.
     ExecutorService pool;
     ConcurrentLinkedQueue<Loadable> queue;
 
     /**
      * Obtains the instance of the loader for a given loadable type.
-     * @param name The loadable type name
+     * @param clazz The loadable type
      * @return The loader for this type name, or a new loader if none exists
      */
-    static Loader getInstance(String name) {
+    static Loader getInstance(Class<? extends Loadable> clazz) {
         // We may need to change this to a configurable thread pool size
         // on a per-type basis. This is the only place to change.
 
+        String name = clazz.getName();
         Loader loader = new Loader();
         Loader oldEntry = typeMap.putIfAbsent(name, loader);
 
@@ -77,6 +108,19 @@
             // pool = Executors.newCachedThreadPool();
     }
 
+    private static <T extends Loadable> LoadablePool<T>
+            getLoadablePool(Class<T> clazz) {
+        LoadablePool<T> pool = new LoadablePool<T>(3 * BATCHSIZE, clazz);
+        @SuppressWarnings("unchecked")
+                LoadablePool<T> oldEntry = poolMap.putIfAbsent(clazz, pool);
+
+        if (oldEntry != null) {
+            pool = oldEntry;
+        }
+
+        return pool;
+    }
+
     /**
      * Sets the URL for the connection to the database.
      * @param url The connection URL
@@ -95,10 +139,21 @@
     /**
      * Uses the loadable to clear the database through the loadable's
      * clear statement.
-     * @param l The loadable to use
+     * @param clazz The loadable class to use
      */
-    public static void clear(final Loadable l) {
+    public static void clear(Class<? extends Loadable> clazz) {
+        Loadable loadable = null;
+        try {
+            loadable = clazz.newInstance();
+        } catch (Exception ex) {
+            logger.log(Level.SEVERE, "Error instantiating loader class.", ex);
+            increaseErrorCount();
+        }
+
+        if (loadable != null) {
+            final Loadable l = loadable;
         Future f = l.loader.pool.submit(new Runnable() {
+
             public void run() {
                 ThreadConnection c = ThreadConnection.getInstance();
                 try {
@@ -115,11 +170,12 @@
             try {
                 Thread.sleep(200);
             } catch (InterruptedException e) {
-                logger.log(Level.WARNING, l.loader.name + ": Interrupted while " +
-                                                "waiting to clear table.", e);
+                    logger.log(Level.WARNING, l.loader.name + ": Interrupted " +
+                            "while waiting to clear table.", e);
             }
         }
     }
+    }
 
     /**
      * Loads the loadable into the database. Note that the loading is done
@@ -129,10 +185,29 @@
      * will gracefully shut down the processing infrastructure and wait until
      * all preparation is done. Shutdown will wait until all data loading
      * is done.
-     * @param l
+     * @param clazz The loadable class
+     * @param occurrences The number of load iterations
      */
-    public static void load(final Loadable l) {
+    public static void load(Class<? extends Loadable> clazz, int occurrences) {
+
+        final Class<? extends Loadable> c = clazz;
+        final int occ = occurrences;
+        Thread mainLoader = new Thread() {
+
+            @Override
+            public void run() {
+                for (int i = 0; i < occ; i++) {
+                    Loadable loadable = null;
+                    try {
+                        loadable = getLoadablePool(c).getLoadable();
+                    } catch (Exception ex) {
+                        logger.log(Level.SEVERE, "Error obtaining loadable", ex);
+                        increaseErrorCount();
+                    }
+                    if (loadable != null) {
+                        final Loadable l = loadable;
         processor.execute(new Runnable() {
+
             public void run() {
                 try {
                     l.prepare();
@@ -144,13 +219,35 @@
             }
         });
     }
+                }
+            }
+        };
+        mainLoaders.add(mainLoader);
+        mainLoader.start();
+    }
+
+    public static void exec(Runnable r) {
+        processor.execute(r);
+    }
 
     /**
      * Execute the post loads provided by the loadable.
-     * @param l The loadable.
+     * @param clazz The loadable class
      */
-    public static void postLoad(final Loadable l) {
+    public static void postLoad(Class<? extends Loadable> clazz) {
+        Loadable loadable = null;
+        try {
+            loadable = clazz.newInstance();
+        } catch (Exception ex) {
+            logger.log(Level.SEVERE, "Error instantiating loader class.", ex);
+            increaseErrorCount();
+        }
+
+        if (loadable != null) {
+
+            final Loadable l = loadable;
         l.loader.pool.submit(new Runnable() {
+
             public void run() {
                 try {
                     l.postLoad();
@@ -162,6 +259,7 @@
             }
         });
     }
+    }
 
 
     private void add(Loadable l) {
@@ -185,6 +283,16 @@
      * preparation is done.
      */
     public static void waitProcessing() {
+        // Wait for the main loaders
+        for (Thread mainLoader : mainLoaders) {
+            for (;;)
+                try {
+                    mainLoader.join();
+                    break;
+                } catch (InterruptedException e) {
+                    logger.log(Level.WARNING, e.getMessage(), e);
+                }
+        }
         // We ensure the process pool is cleared, first.
         if (processor != null) {
         processor.shutdown();

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadConnection.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id: ThreadConnection.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
 package org.apache.olio.workload.loader.framework;
 
 import java.sql.*;
@@ -24,7 +43,7 @@
     };
 
     private static boolean COMMIT_TX = Boolean.parseBoolean(
-                                     System.getProperty("commit.tx", "true"));
+                                    System.getProperty("commit.tx", "true"));
     private static final List<ThreadConnection> CONNECTIONLIST =
             Collections.synchronizedList(new ArrayList<ThreadConnection>());
 
@@ -63,7 +82,6 @@
         try {
             if (conn == null || conn.isClosed()) {
                 conn = DriverManager.getConnection(connectionURL);
-		        //conn.setAutoCommit(false);
                 statement = null;
                 statementText = null;
             }
@@ -118,7 +136,8 @@
         }
     }
 
-    void processBatch(String name, int batchCount, Queue<Loadable> queue) {
+    void processBatch(String name, int batchCount,
+            Queue<? extends Loadable> queue) {
         // First we need to save the load objects from the queue
         // so we do not loose them in case we need to retry.
         if (batchBuffer == null) {
@@ -148,7 +167,7 @@
             batchName = "final " + count +  " object batch.";
 
         int flushed = 0;
-        for (int retry = 0; retry < 10; retry++) {
+        for (int retry = 0; retry < 2; retry++) {
             try {
                 for (int i = flushed; i < count; i++) {
                     batchBuffer[i].load();
@@ -175,7 +194,7 @@
                 logger.fine(name + ": Loaded " + batchName);
                 break; // We won't retry if everything is OK.
             } catch (BatchUpdateException e) {
-                if (retry < 10) {
+                if (retry == 0) {
                     resetConnection();
                     logger.log(Level.WARNING, name +
                                                 ": Retry loading.", e);
@@ -205,11 +224,12 @@
         }
 
         // Once we're done with this buffer, don't hold on to the objects.
-        // Let them get GC'd so we don't bloat memory. Minimal CPU cost
-        // for such tight loop and setting all entries to null.
-        for (int i = 0; i < batchBuffer.length; i++)
+        // Return them to the pool so we don't bloat memory.
+        for (int i = 0; i < batchBuffer.length; i++) {
+            batchBuffer[i].pool.putLoader(batchBuffer[i]);
             batchBuffer[i] = null;
     }
+    }
 
     void flush() throws SQLException {
         statement.executeBatch();
@@ -227,7 +247,6 @@
     }
 
     static void closeConnections() {
-        System.out.println("Closing connection");
         synchronized (CONNECTIONLIST) {
             for (ThreadConnection c : CONNECTIONLIST)
                 try {

Modified: incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java?rev=755764&r1=755763&r2=755764&view=diff
==============================================================================
--- incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java (original)
+++ incubator/olio/workload/rails/trunk/src/org/apache/olio/workload/loader/framework/ThreadResource.java Wed Mar 18 22:44:03 2009
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * $Id: ThreadResource.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ */
 package org.apache.olio.workload.loader.framework;
 
 import com.sun.faban.driver.util.Random;