You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/23 23:01:17 UTC
svn commit: r1175018 - in /incubator/jena/Scratch/PC/tdbloader2/trunk:
README pom.xml src/main/java/cmd/tdbloader2.java
src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
Author: castagna
Date: Fri Sep 23 21:01:17 2011
New Revision: 1175018
URL: http://svn.apache.org/viewvc?rev=1175018&view=rev
Log:
JENA-117
Modified:
incubator/jena/Scratch/PC/tdbloader2/trunk/README
incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/README
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/README?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/README (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/README Fri Sep 23 21:01:17 2011
@@ -2,4 +2,34 @@ tdbloader2
----------
This is an experimental (pure Java) version of tdbloader2.
-See also: https://issues.apache.org/jira/browse/JENA-117
\ No newline at end of file
+See also: https://issues.apache.org/jira/browse/JENA-117
+
+If you want to try it, here is how you can checkout and compile it:
+
+ cd /tmp
+ svn co https://svn.apache.org/repos/asf/incubator/jena/Scratch/PC/tdbloader2/trunk/ tdbloader2
+ cd /tmp/tdbloader2
+ mvn package
+
+This is how you can run it:
+
+ java -cp target/tdbloader2-0.1-incubating-SNAPSHOT-jar-with-dependencies.jar -server -d64 -Xmx6144M cmd.tdbloader2 --no-stats --compression --spill-size 1500000 --loc /tmp/tdb /opt/datasets/raw/openlibrary-2011-06-02.nt.gz
+
+For a list of the options:
+
+ java -cp target/tdbloader2-0.1-incubating-SNAPSHOT-jar-with-dependencies.jar cmd.tdbloader2 -h
+
+ cmd.tdbloader2 --loc=DIR FILE ...
+ General
+ -v --verbose Verbose
+ -q --quiet Run with minimal output
+ --debug Output information for debugging
+ --help
+ --version Version information
+ --loc Location
+ --compression Use compression for intermediate files
+ --buffer-size The size of buffers for IO in bytes
+ --gzip-outside GZIP...(Buffered...())
+ --spill-size The size of spillable segments in tuples|records
+ --no-stats Do not generate the stats file
+ --no-buffer Do not use Buffered{Input|Output}Stream
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml Fri Sep 23 21:01:17 2011
@@ -105,40 +105,22 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.1.2</version>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
<executions>
<execution>
- <id>attach-sources</id>
+ <id>make-assembly</id>
<phase>package</phase>
<goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
+ <goal>single</goal>
</goals>
</execution>
</executions>
- <configuration>
- <version>true</version>
- <show>public</show>
- <quiet>true</quiet>
- <encoding>${project.build.sourceEncoding}</encoding>
- <windowtitle>${project.name} ${project.version}</windowtitle>
- <doctitle>${project.name} ${project.version}</doctitle>
- <!-- Exclude the implementation -->
- <includePackageNames>org.apache.jena.larq</includePackageNames>
- </configuration>
</plugin>
<plugin>
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Fri Sep 23 21:01:17 2011
@@ -376,26 +376,24 @@ public class tdbloader2 extends CmdGener
// Utility methods for RDF parsing...
public static final NodeToLabel nodeToLabel = NodeToLabel.createBNodeByLabelAsGiven();
+ private static final Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
public static String serialize(Node node) {
- Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
StringWriter out = new StringWriter();
OutputLangUtils.output(out, node, prologue, nodeToLabel);
return out.toString();
}
private static ParserProfile createParserProfile(String runId, String filename) {
- Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
LabelToNode labelMapping = new CustomLabelToNode(runId, filename);
return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
}
private static ParserProfile createParserProfile() {
- Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
LabelToNode labelMapping = LabelToNode.createUseLabelAsGiven() ;
return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
}
-
+
private static ParserProfile profile = createParserProfile();
public static Node parse(String string) {
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java Fri Sep 23 21:01:17 2011
@@ -32,6 +32,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.AbstractDataBag;
@@ -48,10 +50,14 @@ public class MultiThreadedSortedDataBag<
private final SerializationFactory<E> serializationFactory;
private final Comparator<? super E> comparator;
+ private ExecutorService pool = Executors.newSingleThreadExecutor();
+ private boolean multithreaded = true ;
+
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
+
public MultiThreadedSortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
{
this.policy = policy;
@@ -115,7 +121,11 @@ public class MultiThreadedSortedDataBag<
Object[] array = memory.toArray();
Sink<E> serializer = serializationFactory.createSerializer(out);
- new Spiller (array, serializer).start();
+ if ( multithreaded ) {
+ pool.execute( new Spiller (array, serializer) ) ;
+ } else {
+ spill(array, serializer) ;
+ }
spilled = true;
policy.reset();
@@ -123,7 +133,7 @@ public class MultiThreadedSortedDataBag<
}
}
- class Spiller extends Thread {
+ class Spiller implements Runnable {
private Object[] array ;
private Sink<E> serializer ;
@@ -133,23 +143,26 @@ public class MultiThreadedSortedDataBag<
this.serializer = serializer ;
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
- Arrays.sort(array, (Comparator)comparator);
- try
- {
- for (Object tuple : array)
- {
- serializer.send((E)tuple);
- }
- }
- finally
+ spill(array, serializer) ;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private final void spill (Object[] array, Sink<E> serializer) {
+ Arrays.sort(array, (Comparator)comparator);
+ try
+ {
+ for (Object tuple : array)
{
- serializer.close();
+ serializer.send((E)tuple);
}
}
-
+ finally
+ {
+ serializer.close();
+ }
}
public void flush()
@@ -184,6 +197,8 @@ public class MultiThreadedSortedDataBag<
if (spilled)
{
+ if ( ( multithreaded ) && ( ! pool.isShutdown() ) ) pool.shutdown() ;
+
List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(spillFiles.size() + (memSize > 0 ? 1 : 0));
if (memSize > 0)