You are viewing a plain text version of this content. The canonical link for it is here.
Posted to olio-commits@incubator.apache.org by ak...@apache.org on 2009/08/18 19:13:37 UTC

svn commit: r805541 - in /incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader: FileLoader.java FileLoader2.java

Author: akara
Date: Tue Aug 18 19:13:37 2009
New Revision: 805541

URL: http://svn.apache.org/viewvc?rev=805541&view=rev
Log:
Issue OLIO-40: Moved FileLoader2 to FileLoader both for java class name and shell script name.

Removed:
    incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader2.java
Modified:
    incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java

Modified: incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java?rev=805541&r1=805540&r2=805541&view=diff
==============================================================================
--- incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java (original)
+++ incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java Tue Aug 18 19:13:37 2009
@@ -14,33 +14,65 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
- *  $Id: FileLoader.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ *
+ *  $Id$
  */
-
 package org.apache.olio.workload.fsloader;
 
+import com.sun.faban.harness.util.FileHelper;
 import org.apache.olio.workload.util.ScaleFactors;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Formatter;
 import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 
 public class FileLoader {
 
+    /**
+     * The default number of loader threads. Since the threads are blocking
+     * for I/O, this number may be quite high.
+     */
+    public static final int DEFAULT_LOADER_THREADS = 128;
+
+    /**
+     * The frequency to report status, in number of files.
+     */
+    public static final int REPORT_FREQUENCY = 10000;
+
+    private static final String DIR_PATTERN = File.separator + "%03d" + File.separator + "%03d" + File.separator;
+
     private static Logger logger = Logger.getLogger(FileLoader.class.getName());
+    private static ExecutorService threadPool;
+    private static ArrayList<FileChannel> openFiles = new ArrayList<FileChannel>();
+    private static LoaderPool loaderPool;
 
     public static void main(String[] args) throws Exception {
         String srcDir = args[0];
         String destDir = args[1];
         ScaleFactors.setActiveUsers(Integer.parseInt(args[2]));
+
+        int loaderThreads;
+        if (args.length > 3)
+            loaderThreads = Integer.parseInt(args[3]);
+        else
+            loaderThreads = DEFAULT_LOADER_THREADS;
+
+        // Throttle the job producer so that the objects in the work
+        // queue are limited to 10x the worker threads.
+        loaderPool = new LoaderPool(loaderThreads * 10);
+
         srcDir += File.separator;
 
         // Clear the dest dir
@@ -51,135 +83,147 @@
         }
 
         logger.info("Deleting files in " + destDir);
-        File[] list = dest.listFiles();
-        for (File f : list) {
-            String name = f.getName();
-            boolean delete = false;
-            if (name.endsWith(".jpg"))
-                delete = true;
-            else if (name.endsWith(".JPG"))
-                delete = true;
-            else if (name.endsWith(".pdf"))
-                delete = true;
-            else if (name.endsWith(".PDF"))
-                delete = true;
-            if (delete && !f.delete())
-                logger.warning("Error deleting file " + f.getName());
-        }
-
-        ArrayList<LoaderThread> loaders = new ArrayList<LoaderThread>();
-
-        loaders.add(new LoaderThread(srcDir + "person.jpg",
-                destDir + File.separator + "p%d.jpg", ScaleFactors.users));
-        loaders.add(new LoaderThread(srcDir + "person_thumb.jpg",
-                destDir + File.separator + "p%dt.jpg", ScaleFactors.users));
-        loaders.add(new LoaderThread(srcDir + "event.jpg",
-                destDir + File.separator + "e%d.jpg", ScaleFactors.events));
-        loaders.add(new LoaderThread(srcDir + "event_thumb.jpg",
-                destDir + File.separator + "e%dt.jpg", ScaleFactors.events));
-        loaders.add(new LoaderThread(srcDir + "event.pdf",
-                destDir + File.separator + "e%d.pdf", ScaleFactors.events));
 
-        for (LoaderThread loader : loaders) {
-            loader.join();
-        }
+        File f = new File(dest, "persons");
+        if (f.exists())
+            FileHelper.recursiveDelete(f);
+
+        f = new File(dest, "personThumbs");
+        if (f.exists())
+            FileHelper.recursiveDelete(f);
+
+        f = new File(dest, "events");
+        if (f.exists())
+            FileHelper.recursiveDelete(f);
+
+        f = new File(dest, "eventThumbs");
+        if (f.exists())
+            FileHelper.recursiveDelete(f);
+
+        f = new File(dest, "eventLits");
+        if (f.exists())
+            FileHelper.recursiveDelete(f);
+
+        threadPool = Executors.newFixedThreadPool(loaderThreads);
+
+        logger.info("Loading files to " + destDir);
+
+        load(srcDir + "person.jpg", destDir + File.separator +
+                "persons", "p%d.jpg", ScaleFactors.users);
+        load(srcDir + "person_thumb.jpg", destDir + File.separator +
+                "personThumbs", "p%dt.jpg", ScaleFactors.users);
+        load(srcDir + "event.jpg", destDir + File.separator +
+                "events", "e%d.jpg", ScaleFactors.events);
+        load(srcDir + "event_thumb.jpg", destDir + File.separator +
+                "eventThumbs", "e%dt.jpg", ScaleFactors.events);
+        load(srcDir + "event.pdf", destDir + File.separator +
+                "eventLits", "e%dl.pdf", ScaleFactors.events);
+
+        threadPool.shutdown();
+
+        boolean terminated = false;
+        while (!terminated)
+            try {
+                terminated = threadPool.awaitTermination(1, TimeUnit.HOURS);
+            } catch (InterruptedException e) {
+            }
 
-        for (LoaderThread loader : loaders) {
-            loader.close();
+        for (FileChannel channel : openFiles) {
+            channel.close();
         }
 
         System.exit(0);
-/*
-        FileChannel img = new FileInputStream(
-                                    srcDir + "person.jpg").getChannel();
-        FileChannel thumb = new FileInputStream(
-                                    srcDir + "person_thumb.jpg").getChannel();
-        long imgSize = img.size();
-        long thumbSize = thumb.size();
-
-        logger.info("Loading user images...");
-        for (int i = 1; i <= ScaleFactors.users; i++) {
-            logger.finer("Loading files for user " + i);
-            copyTo(img, imgSize, destDir + File.separator + "p" + i + ".jpg");
-            copyTo(thumb, thumbSize,
-                    destDir + File.separator + "p" + i + "t.jpg");
-        }
-
-        img.close();
-        thumb.close();
-
-        logger.info("Loading event images and files...");
-        img = new FileInputStream(srcDir + "event.jpg").getChannel();
-        thumb = new FileInputStream(srcDir + "event_thumb.jpg").getChannel();
-        FileChannel lit = new FileInputStream(
-                                    srcDir + "event.pdf").getChannel();
-
-        imgSize = img.size();
-        thumbSize = thumb.size();
-        long litSize = lit.size();
-
-        for (int i = 1; i <= ScaleFactors.events; i++) {
-            logger.finer("Loading files for event " + i);
-            copyTo(img, imgSize, destDir + File.separator + "e" + i + ".jpg");
-            copyTo(thumb, thumbSize,
-                    destDir + File.separator + "e" + i + "t.jpg");
-            copyTo(lit, litSize, destDir + File.separator + "e" + i + ".pdf");
-        }
-
-        img.close();
-        thumb.close();
-        lit.close();
-        System.exit(0);
-*/
     }
 
-    /*
-    private static void copyTo(FileChannel src, long size, String destFile)
-            throws IOException {
-        FileChannel dest = (new FileOutputStream(destFile)).getChannel();
-        src.transferTo(0, size, dest);
-        dest.close();
+    private static void load(String srcFile, String destDir,
+                             String destPattern, int count)
+            throws FileNotFoundException {
+        Formatter format = new Formatter();
+        FileChannel srcChannel = new FileInputStream(srcFile).getChannel();
+        openFiles.add(srcChannel);
+        for (int i = 1; i <= count; i++) {
+            int dir1 = i % 1000;
+            int dir2 = i / 1000000 % 1000;
+            String dirName =
+                    format.format(destDir + DIR_PATTERN, dir1, dir2).toString();
+            File dir = new File(dirName);
+            dir.mkdirs();
+            String dest = format.format(destPattern, i).toString();
+            if (i % REPORT_FREQUENCY == 0) {
+                logger.info("Copying to " + dest);
+            }
+            ((StringBuilder) format.out()).setLength(0);
+            threadPool.submit(loaderPool.getLoader(srcChannel, dest, i));
+        }
     }
-    */
 
-    static class LoaderThread extends Thread {
+
+    static class Loader implements Runnable {
 
         FileChannel src;
-        int count;
-        long size;
-        String pattern;
-        Formatter format;
-
-        public LoaderThread(String src, String destPattern, int count)
-                throws IOException {
-            this.src = new FileInputStream(src).getChannel();
-            size = this.src.size();
-            this.count = count;
-            this.pattern = destPattern;
-            format = new Formatter();
-            start();
-        }
+        String dest;
+        int fileNo;
 
         public void run() {
-            for (int i = 1; i <= count; i++) {
-                String dest = format.format(pattern, i).toString();
-                if (i % 1000 == 0) {
+            try {
+                FileChannel destChannel =
+                        new FileOutputStream(dest).getChannel();
+                src.transferTo(0, src.size(), destChannel);
+                destChannel.close();
+                if (fileNo % REPORT_FREQUENCY == 0)
                     logger.info("Copying to " + dest);
-                }
-                ((StringBuilder) format.out()).setLength(0);
-                try {
-                    FileChannel destChannel = new FileOutputStream(dest).
-                                                            getChannel();
-                    src.transferTo(0, size, destChannel);
-                    destChannel.close();
-                } catch (IOException e) {
-                    logger.log(Level.WARNING, "Error writing file " + dest, e);
+
+            } catch (IOException e) {
+                logger.log(Level.WARNING, "Error writing file " + dest, e);
+            } finally {
+                loaderPool.putLoader(this);
+            }
+        }
+    }
+
+    static class LoaderPool {
+
+        LinkedBlockingDeque<Loader> pool = new LinkedBlockingDeque<Loader>();
+        int count = 0;
+        int size;
+
+        public LoaderPool(int size) {
+            this.size = size;
+        }
+
+        public Loader getLoader(FileChannel src, String dest, int fileNo) {
+            Loader loader = pool.poll();
+            if (loader == null) {
+                if (count < size) {
+                    loader = new Loader();
+                    ++count;
+                } else {
+                    for (;;) {
+                    try {
+                        loader = pool.take();
+                        break;
+                    } catch (InterruptedException ex) {
+                        logger.log(Level.WARNING, "getLoader interrupted", ex);
+                    }
+                    }
                 }
             }
+            loader.src = src;
+            loader.dest = dest;
+            loader.fileNo = fileNo;
+            return loader;
         }
 
-        public void close() throws IOException {
-            src.close();
+        public void putLoader(Loader loader) {
+            for (;;) {
+                try {
+                    // User a LIFO model to keep the hot objects in cache.
+                    pool.putFirst(loader);
+                    break;
+                } catch (InterruptedException ex) {
+                    logger.log(Level.WARNING, "putLoader interrupted!", ex);
+                }
+            }
         }
     }
 }