You are viewing a plain text version of this content. The canonical link for it is here.
Posted to alois-commits@incubator.apache.org by fl...@apache.org on 2010/11/30 15:11:47 UTC

svn commit: r1040571 - in /incubator/alois/trunk: prisma/data/prisma/prisma_database/model/ prisma/lib/prisma/ prisma/test/ rails/test/

Author: flavio
Date: Tue Nov 30 15:11:46 2010
New Revision: 1040571

URL: http://svn.apache.org/viewvc?rev=1040571&view=rev
Log:
Worked on testing, few minor bugfixes and better logging

Modified:
    incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb
    incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb
    incubator/alois/trunk/prisma/lib/prisma/archivator.rb
    incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb
    incubator/alois/trunk/prisma/lib/prisma/transform.rb
    incubator/alois/trunk/prisma/test/test_helper.rb
    incubator/alois/trunk/rails/test/test_helper.rb

Modified: incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb (original)
+++ incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb Tue Nov 30 15:11:46 2010
@@ -25,20 +25,20 @@
 	  throw "Suspicious line found at line #{self.current} (unquoted \" found)!" if line =~ /\".*[^\\]\".*\"/
 	  msg = YAML.parse(eval(line)).transform
 	rescue 
-	 $log.error "Error getting archive record \##{self.current}. (#{$!.message})" if $log.error?
+          $log.error{"Error getting archive record \##{self.current}. (#{$!.message})"}
 	end
 	yield msg if msg
 	exit(0) if $terminate
       end
     end
     
-    def initialize( filename ) 
-      super(nil)
+    def prisma_initialize( filename ) 
       self.filename = filename
       self.current = 0
       self.total = open(filename).readlines.length
       self.todo = self.total
       self.save
+      self
     end
     
     def messages

Modified: incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb (original)
+++ incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb Tue Nov 30 15:11:46 2010
@@ -1,3 +1,4 @@
+# -*- coding: undecided -*-
 # Copyright 2010 The Apache Software Foundation.
 # 
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -48,11 +49,14 @@ class CompressionMeta < ActiveRecord::Ba
       extname = meta_message.read_option("extname")
       cmd = DECOMPRESSION_COMMANDS[extname]
 
-      tmpf = "#{Dir.tmpdir}/CompressMeta-{Process.pid}"
-      File.open(tmpf + extname ,"w") {|f| f.write(message.msg)}
-      throw "'#{cmd} #{tmpf}#{extname}' not successful!" unless system("#{cmd} #{tmpf}#{extname}")
-      msg = File.open(tmpf ,"r") {|f| f.read}
-      FileUtils.rm(tmpf)
+      msg = nil
+      temp_file("CompressionMeta") {|tmpf|
+        compressed = Pathname.new("#{tmpf}#{extname}")
+        File.open(compressed ,"w") {|f| f.write(message.msg)}
+        throw "'#{cmd} #{tmpf}#{extname}' not successful!" unless system("#{cmd} #{compressed}")
+        msg = File.open(tmpf ,"r") {|f| f.read}
+        compressed.unlink if compressed.exist?
+      }
       
       return self.new.prisma_initialize(meta_message,{:extname => extname,
 			:inflate_command => cmd,

Modified: incubator/alois/trunk/prisma/lib/prisma/archivator.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/archivator.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/archivator.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/archivator.rb Tue Nov 30 15:11:46 2010
@@ -75,6 +75,10 @@
 	  msg = nil
 	  begin	 
 	    # leave this for security (evaluating string)
+            if line =~ /^(\S+)\:(\".*\")$/
+              $log.debug{"Removing leading filename #{$1}"}
+              line = $2
+            end
 	    throw "Leading and/or tailing \" not found in file '#{archivfile}:#{i}'!" unless line =~ /^".*\"$/
  	    throw "Suspicious line found in file '#{archivfile}:#{i}' (unquoted \" found)!" if line =~ /\".*[^\\]\".*\"/
 	    msg = Object.from_yaml(eval(line))

Modified: incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb Tue Nov 30 15:11:46 2010
@@ -218,7 +218,7 @@ module BaseMixin
 	  $log.debug{"Results: #{results.inspect}"}
 	  $log.debug("Fields:  #{expr[:fields].inspect}") if $log.debug?
 	  if results.size != expr[:fields].length
-	    throw "Regexp matched (#{results.size}) more than fields (#{expr[:fields].length}) defined. (Tansform against: #{self.name})"
+	    throw "Regexp matched (#{results.size}) more than fields (#{expr[:fields].length}) defined. (Res: #{results.inspect} Tansform against: #{self.name})"
 	  end
 	  
 	  values = {}

Modified: incubator/alois/trunk/prisma/lib/prisma/transform.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/transform.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/transform.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/transform.rb Tue Nov 30 15:11:46 2010
@@ -27,159 +27,155 @@ module Prisma
       source = FileRaw.create(filename,options)
     end
     
-
-  
-  def Prisma.transform_messages(limit = 100)
-
-#    [ 'HUP' , 'INT' , 'TERM', 'USR1', 'USR2'].each {|s|
-#      Signal.trap(s) {
-#	$log.error("SIGNAL---#{s}") rescue $log.error("aaaaaaaaaasldkfj")
-#      }
-#    }
-#    Signal.trap("TERM") do
-#      # End current tranform.
-#      $log.error "Caught #{Process.pid} signal TERM." if $log.info?
-#      $log.error "Going to stop..." if $log.info?
-#      $terminate = true
-#    end
-    $terminate = false
-
-    last_id = 0
-    total = 0
-    iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
-    while iter.length > 0 and not $terminate
-      main_cost = Benchmark.measure {
-	iter.each { |message|
-	  $log.info("Transforming message #{message.id}") if $log.info?
-	  meta = message.parent
-	  if meta == nil then
-	    $log.warn("Message has no meta entry.") if $log.warn?
-	  else
-	    Prisma.transaction(meta) do
-	      meta.transform	    
-	    end
-	  end
-	  last_id = message.id
-	}
-      }.real
+    def self.transform_messages(limit = 100)
+      #    [ 'HUP' , 'INT' , 'TERM', 'USR1', 'USR2'].each {|s|
+      #      Signal.trap(s) {
+      #	$log.error("SIGNAL---#{s}") rescue $log.error("aaaaaaaaaasldkfj")
+      #      }
+      #    }
+      #    Signal.trap("TERM") do
+      #      # End current tranform.
+      #      $log.error "Caught #{Process.pid} signal TERM." if $log.info?
+      #      $log.error "Going to stop..." if $log.info?
+      #      $terminate = true
+      #    end
+      $terminate = false
       
-      total = total + iter.length
-
-      Prisma.perf {"Done #{limit} in #{main_cost}s (#{limit/ main_cost}/s)."}
-      Prisma.perf {"Current message is #{last_id} done #{total}."}
-
+      last_id = 0
+      total = 0
       iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
-    end
-  end
-  # TODO: comment transform_queues function
-  # transform all available dbsources
-  def self.transform_queues(type = :fifo, count = nil, waiting_time=nil)
-    if $do_not_run_prisma
-      $log.error{ "Will not start primsa queues cause option do_not_run_prisma is defined"}
-      return
-    end
-    pids = []
-    for klass in get_classes(:raw)
-      $log.warn("Starting queue for class #{klass.name}.")
-      pid = fork do
-	define_new_logger(klass.name)
-        Prisma.reconnect
-
-#	Signal.trap("USR1") do
-#	  # End current transform and begin new transform
-#	  $log.info "Child #{Process.pid} caught signal USR1" if $log.info?
-#	  $log.info "Going to restart..." if $log.info?
-#	  $restart = true
-#	  $terminate = true
-#	end
-	Signal.trap("TERM") do
-	  # End current tranform.
-	  $log.warn{"Caught #{Process.pid} signal TERM."}
-	  $log.warn{"Going to stop..."}
-	  $restart = false
-	  $terminate = true
-	end
-
-	source = nil
-	$restart = true
-	while $restart
-	  $terminate = false
-	  $restart = type != :all
-	  begin
-	    $log.info "Process last records of class #{klass.name}, #{count} per step" if $log.info?
-	    if source 
-	      source.finished = true
-	      source.save	      
-	    end
-	    source = SourceDbMeta.new.prisma_initialize(type, klass, count,nil, false, waiting_time)
-	    $enable_dublette_recognition = source.may_contain_dublettes
-	    source.transform
-	  rescue ActiveRecord::Transactions::TransactionError
-	    raise $!
-	  rescue	    
-	    $log.error{ "Processing class #{klass.name} threw error #{$!}!" }
-	    for line in $!.backtrace
-	      $log.error{"#{line}"}
-	    end
-	    if Prisma::Database.check_connections
-	      $log.fatal{"Connections are good, so something bad happened. Will not risk to restart queue #{klass.name}."}
-	      $terminate = true
-	      $restart = false
-	    else
-	      $log.info{"At least one prisma connection is down."}
-	      connection_wait_count = 1
-	      while not (Prisma::Database.check_connections or $terminate)
-		wait_time = connection_wait_count
-		wait_time = 30 if wait_time > 30 
-		$log.warn{"#{connection_wait_count} Waiting #{wait_time} seconds."}
-		Prisma.save_sleep(wait_time)
-		connection_wait_count += 1
-	      end
-	      if !$terminate
-		$log.info{"Connection are good again. Restarting queue #{klass.name}."}		
-	      end
-	    end
-	  end
-	end
-	$log.info "Child #{Process.pid} end." if $log.info?
-        $log.info "Stopped processing class #{klass.name}." if $log.info?
+      while iter.length > 0 and not $terminate
+        main_cost = Benchmark.measure {
+          iter.each { |message|
+            $log.info("Transforming message #{message.id}") if $log.info?
+            meta = message.parent
+            if meta == nil then
+              $log.warn("Message has no meta entry.") if $log.warn?
+            else
+              Prisma::Database.transaction(meta.class) do
+                meta.transform	    
+              end
+            end
+            last_id = message.id
+          }
+        }.real
+        
+        total = total + iter.length
+        
+        Prisma::Util.perf {"Done #{limit} in #{main_cost}s (#{limit/ main_cost}/s)."}
+        Prisma::Util.perf {"Current message is #{last_id} done #{total}."}
+        
+        iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
       end
-      pids.push(pid)
     end
-#    Signal.trap("USR1") do
-#      # End current transform and begin new transform
-#      $log.info "Caught signal USR1" if $log.info?
-#      $log.info "Going to restart prismas." if $log.info?
-#      for pid in pids
-#	Process.kill("USR1",pid)
-#      end
-#    end
-    Signal.trap("TERM") do
-      # End current tranform.
-      $log.warn{"Caught signal TERM."}
-      $log.warn{"Going to stop prismas."}
-      $terminate = true
-      $log.error{"No pids found."} if pids.nil? or pids.length == 0
+    
+    # TODO: comment transform_queues function
+    # transform all available dbsources
+    def self.transform_queues(type = :fifo, count = nil, waiting_time=nil)
+      if $do_not_run_prisma
+        $log.error{ "Will not start primsa queues cause option do_not_run_prisma is defined"}
+        return
+      end
+      pids = []
+      for klass in get_classes(:raw)
+        $log.warn("Starting queue for class #{klass.name}.")
+        pid = fork do
+          define_new_logger(klass.name)
+          Prisma.reconnect
+          
+          #	Signal.trap("USR1") do
+          #	  # End current transform and begin new transform
+          #	  $log.info "Child #{Process.pid} caught signal USR1" if $log.info?
+          #	  $log.info "Going to restart..." if $log.info?
+          #	  $restart = true
+          #	  $terminate = true
+          #	end
+          Signal.trap("TERM") do
+            # End current tranform.
+            $log.warn{"Caught #{Process.pid} signal TERM."}
+            $log.warn{"Going to stop..."}
+            $restart = false
+            $terminate = true
+          end
+          
+          source = nil
+          $restart = true
+          while $restart
+            $terminate = false
+            $restart = type != :all
+            begin
+              $log.info "Process last records of class #{klass.name}, #{count} per step" if $log.info?
+              if source 
+                source.finished = true
+                source.save	      
+              end
+              source = SourceDbMeta.new.prisma_initialize(type, klass, count,nil, false, waiting_time)
+              $enable_dublette_recognition = source.may_contain_dublettes
+              source.transform
+            rescue ActiveRecord::Transactions::TransactionError
+              raise $!
+            rescue	    
+              $log.error{ "Processing class #{klass.name} threw error #{$!}!" }
+              for line in $!.backtrace
+                $log.error{"#{line}"}
+              end
+              if Prisma::Database.check_connections
+                $log.fatal{"Connections are good, so something bad happened. Will not risk to restart queue #{klass.name}."}
+                $terminate = true
+                $restart = false
+              else
+                $log.info{"At least one prisma connection is down."}
+                connection_wait_count = 1
+                while not (Prisma::Database.check_connections or $terminate)
+                  wait_time = connection_wait_count
+                  wait_time = 30 if wait_time > 30 
+                  $log.warn{"#{connection_wait_count} Waiting #{wait_time} seconds."}
+                  Prisma.save_sleep(wait_time)
+                  connection_wait_count += 1
+                end
+                if !$terminate
+                  $log.info{"Connection are good again. Restarting queue #{klass.name}."}		
+                end
+              end
+            end
+          end
+          $log.info "Child #{Process.pid} end." if $log.info?
+          $log.info "Stopped processing class #{klass.name}." if $log.info?
+        end
+        pids.push(pid)
+      end
+      #    Signal.trap("USR1") do
+      #      # End current transform and begin new transform
+      #      $log.info "Caught signal USR1" if $log.info?
+      #      $log.info "Going to restart prismas." if $log.info?
+      #      for pid in pids
+      #	Process.kill("USR1",pid)
+      #      end
+      #    end
+      Signal.trap("TERM") do
+        # End current tranform.
+        $log.warn{"Caught signal TERM."}
+        $log.warn{"Going to stop prismas."}
+        $terminate = true
+        $log.error{"No pids found."} if pids.nil? or pids.length == 0
+        for pid in pids
+          $log.warn{"Sending term to #{pid}."}
+          Process.kill("TERM",pid)
+        end
+      end
+      
+      $log.debug "Parent process waiting." if $log.debug?
+      while !$terminate
+        sleep 1
+      end
+      $log.warn{"Going to wait for children."}
       for pid in pids
-	$log.warn{"Sending term to #{pid}."}
-	Process.kill("TERM",pid)
+        ret = Process.wait
+        $log.warn{"Child returned with:#{ret}"}
+        $log.error{"Child #{ret} returned without shutdown!"} if not $terminate and not type==:all
       end
+      $log.warn{"Prisma main process ended."}
     end
-
-    $log.debug "Parent process waiting." if $log.debug?
-    while !$terminate
-      sleep 1
-    end
-    $log.warn{"Going to wait for children."}
-    for pid in pids
-      ret = Process.wait
-      $log.warn{"Child returned with:#{ret}"}
-      $log.error{"Child #{ret} returned without shutdown!"} if not $terminate and not type==:all
-    end
-    $log.warn{"Prisma main process ended."}
-  end
-
-
-
+    
   end
 end

Modified: incubator/alois/trunk/prisma/test/test_helper.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/test/test_helper.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/test/test_helper.rb (original)
+++ incubator/alois/trunk/prisma/test/test_helper.rb Tue Nov 30 15:11:46 2010
@@ -23,7 +23,7 @@ require File.expand_path(File.dirname(__
 Prisma::Database.load_all
 
 require "active_record/fixtures"
-class PrismaTest < Test::Unit::TestCase
+class Test::Unit::TestCase
 
   def insert_fixture(klass)
     fixture_file = Pathname.new(__FILE__).dirname + "fixtures/" + klass.table_name
@@ -34,6 +34,66 @@ class PrismaTest < Test::Unit::TestCase
     fix.insert_fixtures
   end
 
+  def file_info(filename)
+    basename = Pathname.new(filename).basename.to_s
+    unless basename =~ /([^\.]*)\.(.*)/
+      g = Dir.glob(filename + ".*").reject {|f| f =~ /\~$/ or f=~/\..*\./}
+      throw "More than one file found. (#{g.inspect})" if g.length > 1
+      throw "File not found '#{filename + ".*"}'" if g.length == 0
+      filename = g[0]
+    end
+    basename = Pathname.new(filename).basename.to_s
+    throw "Malformed filename '#{filename}'" unless basename =~ /([^\.]*)\.(.*)/
+    table_name, type = $1,$2
+    table_class = Prisma::Database.get_class_from_tablename(table_name)
+    throw "Table '#{table_name}' not found." unless table_class
+    
+    ret = {:type => type, :table_class => table_class, :filename => filename}
+  end
+
+  def load_and_transform_file(filename, expected_message_count = 0)
+    fi = file_info(filename)
+    Message.delete_all
+    fi[:table_class].delete_all
+    
+    ret = load_file(filename)
+    Prisma::Transform.transform_messages
+    if Message.count > 0
+      Message.find(:all).each {|m|
+	p m.msg
+      }
+      print "Found still #{Message.count} messages.\n"
+    end
+    assert_equal expected_message_count, Message.count
+  end
+
+  def load_file(filename)
+    fi = file_info(filename)
+    table_class = fi[:table_class]
+
+    case fi[:type]
+    when "messages"
+      Message.delete_all
+      msgs = []
+      for line in open(fi[:filename])
+	# correct windows linefeeds
+	line = line[0..-3] + "\n" if line.ends_with?("\r\n")
+
+	parent = table_class.new
+	message = Message.new.prisma_initialize(parent,line)
+	message.save
+	msgs.push message
+      end
+      return msgs
+    when "archive"
+      a = ArchiveMeta.new.prisma_initialize(fi[:filename])
+      a.save            
+      return a
+    else
+      throw "unknown type '#{type}'."
+    end
+  end
+
 end
 #require 'test_help'
 =begin
@@ -86,65 +146,6 @@ class ActiveSupport::TestCase
     source.transform
   end
 
-  def file_info(filename)
-    basename = Pathname.new(filename).basename.to_s
-    unless basename =~ /([^\.]*)\.(.*)/
-      g = Dir.glob(filename + ".*").reject {|f| f =~ /\~$/ or f=~/\..*\./}
-      throw "More than one file found. (#{g.inspect})" if g.length > 1
-      throw "File not found '#{filename + ".*"}'" if g.length == 0
-      filename = g[0]
-    end
-    basename = Pathname.new(filename).basename.to_s
-    throw "Malformed filename '#{filename}'" unless basename =~ /([^\.]*)\.(.*)/
-    table_name, type = $1,$2
-    table_class = Prisma.get_class_from_tablename(table_name)
-    throw "Table '#{table_name}' not found." unless table_class
-    
-    ret = {:type => type, :table_class => table_class, :filename => filename}
-  end
-
-  def load_file(filename)
-    fi = file_info(filename)
-    table_class = fi[:table_class]
-
-    case fi[:type]
-    when "messages"
-      Message.delete_all
-      msgs = []
-      for line in open(fi[:filename])
-	# correct windows linefeeds
-	line = line[0..-3] + "\n" if line.ends_with?("\r\n")
-
-	parent = table_class.new
-	message = Message.new.prisma_initialize(parent,line)
-	message.save
-	msgs.push message
-      end
-      return msgs
-    when "archive"
-      a = ArchiveMeta.new.prisma_initialize(fi[:filename])
-      a.save            
-      return a
-    else
-      throw "unknown type '#{type}'."
-    end
-  end
-
-  def load_and_transform_file(filename, expected_message_count = 0)
-    fi = file_info(filename)
-    Message.delete_all
-    fi[:table_class].delete_all
-
-    ret = load_file(filename)
-    Prisma.transform_messages
-    if Message.count > 0
-      Message.find(:all).each {|m|
-	p m.msg
-      }
-      print "Found still #{Message.count} messages.\n"
-    end
-    assert_equal expected_message_count, Message.count
-  end
 
   # from http://wiki.rubyonrails.org/rails/pages/HowtoUseMultipleDatabasesWithFixtures
   

Modified: incubator/alois/trunk/rails/test/test_helper.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/rails/test/test_helper.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/rails/test/test_helper.rb (original)
+++ incubator/alois/trunk/rails/test/test_helper.rb Tue Nov 30 15:11:46 2010
@@ -101,7 +101,7 @@ class ActiveSupport::TestCase
       end
       return msgs
     when "archive"
-      a = ArchiveMeta.new(fi[:filename])
+      a = ArchiveMeta.new.prisma_initialize(fi[:filename])
       a.save            
       return a
     else