You are viewing a plain text version of this content. The canonical link for it is here.
Posted to olio-commits@incubator.apache.org by ak...@apache.org on 2009/08/18 19:13:37 UTC
svn commit: r805541 - in
/incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader:
FileLoader.java FileLoader2.java
Author: akara
Date: Tue Aug 18 19:13:37 2009
New Revision: 805541
URL: http://svn.apache.org/viewvc?rev=805541&view=rev
Log:
Issue OLIO-40: Moved FileLoader2 to FileLoader both for java class name and shell script name.
Removed:
incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader2.java
Modified:
incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java
Modified: incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java
URL: http://svn.apache.org/viewvc/incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java?rev=805541&r1=805540&r2=805541&view=diff
==============================================================================
--- incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java (original)
+++ incubator/olio/workload/php/trunk/src/org/apache/olio/workload/fsloader/FileLoader.java Tue Aug 18 19:13:37 2009
@@ -14,33 +14,65 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
- * $Id: FileLoader.java,v 1.1.1.1 2008/09/29 22:33:08 sp208304 Exp $
+ *
+ * $Id$
*/
-
package org.apache.olio.workload.fsloader;
+import com.sun.faban.harness.util.FileHelper;
import org.apache.olio.workload.util.ScaleFactors;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Formatter;
import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class FileLoader {
+ /**
+ * The default number of loader threads. Since the threads are blocking
+ * for I/O, this number may be quite high.
+ */
+ public static final int DEFAULT_LOADER_THREADS = 128;
+
+ /**
+ * The frequency to report status, in number of files.
+ */
+ public static final int REPORT_FREQUENCY = 10000;
+
+ private static final String DIR_PATTERN = File.separator + "%03d" + File.separator + "%03d" + File.separator;
+
private static Logger logger = Logger.getLogger(FileLoader.class.getName());
+ private static ExecutorService threadPool;
+ private static ArrayList<FileChannel> openFiles = new ArrayList<FileChannel>();
+ private static LoaderPool loaderPool;
public static void main(String[] args) throws Exception {
String srcDir = args[0];
String destDir = args[1];
ScaleFactors.setActiveUsers(Integer.parseInt(args[2]));
+
+ int loaderThreads;
+ if (args.length > 3)
+ loaderThreads = Integer.parseInt(args[3]);
+ else
+ loaderThreads = DEFAULT_LOADER_THREADS;
+
+ // Throttle the job producer so that the objects in the work
+ // queue are limited to 10x the worker threads.
+ loaderPool = new LoaderPool(loaderThreads * 10);
+
srcDir += File.separator;
// Clear the dest dir
@@ -51,135 +83,147 @@
}
logger.info("Deleting files in " + destDir);
- File[] list = dest.listFiles();
- for (File f : list) {
- String name = f.getName();
- boolean delete = false;
- if (name.endsWith(".jpg"))
- delete = true;
- else if (name.endsWith(".JPG"))
- delete = true;
- else if (name.endsWith(".pdf"))
- delete = true;
- else if (name.endsWith(".PDF"))
- delete = true;
- if (delete && !f.delete())
- logger.warning("Error deleting file " + f.getName());
- }
-
- ArrayList<LoaderThread> loaders = new ArrayList<LoaderThread>();
-
- loaders.add(new LoaderThread(srcDir + "person.jpg",
- destDir + File.separator + "p%d.jpg", ScaleFactors.users));
- loaders.add(new LoaderThread(srcDir + "person_thumb.jpg",
- destDir + File.separator + "p%dt.jpg", ScaleFactors.users));
- loaders.add(new LoaderThread(srcDir + "event.jpg",
- destDir + File.separator + "e%d.jpg", ScaleFactors.events));
- loaders.add(new LoaderThread(srcDir + "event_thumb.jpg",
- destDir + File.separator + "e%dt.jpg", ScaleFactors.events));
- loaders.add(new LoaderThread(srcDir + "event.pdf",
- destDir + File.separator + "e%d.pdf", ScaleFactors.events));
- for (LoaderThread loader : loaders) {
- loader.join();
- }
+ File f = new File(dest, "persons");
+ if (f.exists())
+ FileHelper.recursiveDelete(f);
+
+ f = new File(dest, "personThumbs");
+ if (f.exists())
+ FileHelper.recursiveDelete(f);
+
+ f = new File(dest, "events");
+ if (f.exists())
+ FileHelper.recursiveDelete(f);
+
+ f = new File(dest, "eventThumbs");
+ if (f.exists())
+ FileHelper.recursiveDelete(f);
+
+ f = new File(dest, "eventLits");
+ if (f.exists())
+ FileHelper.recursiveDelete(f);
+
+ threadPool = Executors.newFixedThreadPool(loaderThreads);
+
+ logger.info("Loading files to " + destDir);
+
+ load(srcDir + "person.jpg", destDir + File.separator +
+ "persons", "p%d.jpg", ScaleFactors.users);
+ load(srcDir + "person_thumb.jpg", destDir + File.separator +
+ "personThumbs", "p%dt.jpg", ScaleFactors.users);
+ load(srcDir + "event.jpg", destDir + File.separator +
+ "events", "e%d.jpg", ScaleFactors.events);
+ load(srcDir + "event_thumb.jpg", destDir + File.separator +
+ "eventThumbs", "e%dt.jpg", ScaleFactors.events);
+ load(srcDir + "event.pdf", destDir + File.separator +
+ "eventLits", "e%dl.pdf", ScaleFactors.events);
+
+ threadPool.shutdown();
+
+ boolean terminated = false;
+ while (!terminated)
+ try {
+ terminated = threadPool.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ }
- for (LoaderThread loader : loaders) {
- loader.close();
+ for (FileChannel channel : openFiles) {
+ channel.close();
}
System.exit(0);
-/*
- FileChannel img = new FileInputStream(
- srcDir + "person.jpg").getChannel();
- FileChannel thumb = new FileInputStream(
- srcDir + "person_thumb.jpg").getChannel();
- long imgSize = img.size();
- long thumbSize = thumb.size();
-
- logger.info("Loading user images...");
- for (int i = 1; i <= ScaleFactors.users; i++) {
- logger.finer("Loading files for user " + i);
- copyTo(img, imgSize, destDir + File.separator + "p" + i + ".jpg");
- copyTo(thumb, thumbSize,
- destDir + File.separator + "p" + i + "t.jpg");
- }
-
- img.close();
- thumb.close();
-
- logger.info("Loading event images and files...");
- img = new FileInputStream(srcDir + "event.jpg").getChannel();
- thumb = new FileInputStream(srcDir + "event_thumb.jpg").getChannel();
- FileChannel lit = new FileInputStream(
- srcDir + "event.pdf").getChannel();
-
- imgSize = img.size();
- thumbSize = thumb.size();
- long litSize = lit.size();
-
- for (int i = 1; i <= ScaleFactors.events; i++) {
- logger.finer("Loading files for event " + i);
- copyTo(img, imgSize, destDir + File.separator + "e" + i + ".jpg");
- copyTo(thumb, thumbSize,
- destDir + File.separator + "e" + i + "t.jpg");
- copyTo(lit, litSize, destDir + File.separator + "e" + i + ".pdf");
- }
-
- img.close();
- thumb.close();
- lit.close();
- System.exit(0);
-*/
}
- /*
- private static void copyTo(FileChannel src, long size, String destFile)
- throws IOException {
- FileChannel dest = (new FileOutputStream(destFile)).getChannel();
- src.transferTo(0, size, dest);
- dest.close();
+ private static void load(String srcFile, String destDir,
+ String destPattern, int count)
+ throws FileNotFoundException {
+ Formatter format = new Formatter();
+ FileChannel srcChannel = new FileInputStream(srcFile).getChannel();
+ openFiles.add(srcChannel);
+ for (int i = 1; i <= count; i++) {
+ int dir1 = i % 1000;
+ int dir2 = i / 1000000 % 1000;
+ String dirName =
+ format.format(destDir + DIR_PATTERN, dir1, dir2).toString();
+ File dir = new File(dirName);
+ dir.mkdirs();
+ String dest = format.format(destPattern, i).toString();
+ if (i % REPORT_FREQUENCY == 0) {
+ logger.info("Copying to " + dest);
+ }
+ ((StringBuilder) format.out()).setLength(0);
+ threadPool.submit(loaderPool.getLoader(srcChannel, dest, i));
+ }
}
- */
- static class LoaderThread extends Thread {
+
+ static class Loader implements Runnable {
FileChannel src;
- int count;
- long size;
- String pattern;
- Formatter format;
-
- public LoaderThread(String src, String destPattern, int count)
- throws IOException {
- this.src = new FileInputStream(src).getChannel();
- size = this.src.size();
- this.count = count;
- this.pattern = destPattern;
- format = new Formatter();
- start();
- }
+ String dest;
+ int fileNo;
public void run() {
- for (int i = 1; i <= count; i++) {
- String dest = format.format(pattern, i).toString();
- if (i % 1000 == 0) {
+ try {
+ FileChannel destChannel =
+ new FileOutputStream(dest).getChannel();
+ src.transferTo(0, src.size(), destChannel);
+ destChannel.close();
+ if (fileNo % REPORT_FREQUENCY == 0)
logger.info("Copying to " + dest);
- }
- ((StringBuilder) format.out()).setLength(0);
- try {
- FileChannel destChannel = new FileOutputStream(dest).
- getChannel();
- src.transferTo(0, size, destChannel);
- destChannel.close();
- } catch (IOException e) {
- logger.log(Level.WARNING, "Error writing file " + dest, e);
+
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Error writing file " + dest, e);
+ } finally {
+ loaderPool.putLoader(this);
+ }
+ }
+ }
+
+ static class LoaderPool {
+
+ LinkedBlockingDeque<Loader> pool = new LinkedBlockingDeque<Loader>();
+ int count = 0;
+ int size;
+
+ public LoaderPool(int size) {
+ this.size = size;
+ }
+
+ public Loader getLoader(FileChannel src, String dest, int fileNo) {
+ Loader loader = pool.poll();
+ if (loader == null) {
+ if (count < size) {
+ loader = new Loader();
+ ++count;
+ } else {
+ for (;;) {
+ try {
+ loader = pool.take();
+ break;
+ } catch (InterruptedException ex) {
+ logger.log(Level.WARNING, "getLoader interrupted", ex);
+ }
+ }
}
}
+ loader.src = src;
+ loader.dest = dest;
+ loader.fileNo = fileNo;
+ return loader;
}
- public void close() throws IOException {
- src.close();
+ public void putLoader(Loader loader) {
+ for (;;) {
+ try {
+ // User a LIFO model to keep the hot objects in cache.
+ pool.putFirst(loader);
+ break;
+ } catch (InterruptedException ex) {
+ logger.log(Level.WARNING, "putLoader interrupted!", ex);
+ }
+ }
}
}
}