You are viewing a plain text version of this content. The canonical link for it is here.
Posted to alois-commits@incubator.apache.org by fl...@apache.org on 2010/11/30 15:11:47 UTC
svn commit: r1040571 - in /incubator/alois/trunk:
prisma/data/prisma/prisma_database/model/ prisma/lib/prisma/ prisma/test/
rails/test/
Author: flavio
Date: Tue Nov 30 15:11:46 2010
New Revision: 1040571
URL: http://svn.apache.org/viewvc?rev=1040571&view=rev
Log:
Worked on testing, few minor bugfixes and better logging
Modified:
incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb
incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb
incubator/alois/trunk/prisma/lib/prisma/archivator.rb
incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb
incubator/alois/trunk/prisma/lib/prisma/transform.rb
incubator/alois/trunk/prisma/test/test_helper.rb
incubator/alois/trunk/rails/test/test_helper.rb
Modified: incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb (original)
+++ incubator/alois/trunk/prisma/data/prisma/prisma_database/model/archive_meta.rb Tue Nov 30 15:11:46 2010
@@ -25,20 +25,20 @@
throw "Suspicious line found at line #{self.current} (unquoted \" found)!" if line =~ /\".*[^\\]\".*\"/
msg = YAML.parse(eval(line)).transform
rescue
- $log.error "Error getting archive record \##{self.current}. (#{$!.message})" if $log.error?
+ $log.error{"Error getting archive record \##{self.current}. (#{$!.message})"}
end
yield msg if msg
exit(0) if $terminate
end
end
- def initialize( filename )
- super(nil)
+ def prisma_initialize( filename )
self.filename = filename
self.current = 0
self.total = open(filename).readlines.length
self.todo = self.total
self.save
+ self
end
def messages
Modified: incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb (original)
+++ incubator/alois/trunk/prisma/data/prisma/prisma_database/model/compression_meta.rb Tue Nov 30 15:11:46 2010
@@ -1,3 +1,4 @@
+# -*- coding: undecided -*-
# Copyright 2010 The Apache Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -48,11 +49,14 @@ class CompressionMeta < ActiveRecord::Ba
extname = meta_message.read_option("extname")
cmd = DECOMPRESSION_COMMANDS[extname]
- tmpf = "#{Dir.tmpdir}/CompressMeta-{Process.pid}"
- File.open(tmpf + extname ,"w") {|f| f.write(message.msg)}
- throw "'#{cmd} #{tmpf}#{extname}' not successful!" unless system("#{cmd} #{tmpf}#{extname}")
- msg = File.open(tmpf ,"r") {|f| f.read}
- FileUtils.rm(tmpf)
+ msg = nil
+ temp_file("CompressionMeta") {|tmpf|
+ compressed = Pathname.new("#{tmpf}#{extname}")
+ File.open(compressed ,"w") {|f| f.write(message.msg)}
+ throw "'#{cmd} #{tmpf}#{extname}' not successful!" unless system("#{cmd} #{compressed}")
+ msg = File.open(tmpf ,"r") {|f| f.read}
+ compressed.unlink if compressed.exist?
+ }
return self.new.prisma_initialize(meta_message,{:extname => extname,
:inflate_command => cmd,
Modified: incubator/alois/trunk/prisma/lib/prisma/archivator.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/archivator.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/archivator.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/archivator.rb Tue Nov 30 15:11:46 2010
@@ -75,6 +75,10 @@
msg = nil
begin
# leave this for security (evaluating string)
+ if line =~ /^(\S+)\:(\".*\")$/
+ $log.debug{"Removing leading filename #{$1}"}
+ line = $2
+ end
throw "Leading and/or tailing \" not found in file '#{archivfile}:#{i}'!" unless line =~ /^".*\"$/
throw "Suspicious line found in file '#{archivfile}:#{i}' (unquoted \" found)!" if line =~ /\".*[^\\]\".*\"/
msg = Object.from_yaml(eval(line))
Modified: incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/base_mixin.rb Tue Nov 30 15:11:46 2010
@@ -218,7 +218,7 @@ module BaseMixin
$log.debug{"Results: #{results.inspect}"}
$log.debug("Fields: #{expr[:fields].inspect}") if $log.debug?
if results.size != expr[:fields].length
- throw "Regexp matched (#{results.size}) more than fields (#{expr[:fields].length}) defined. (Tansform against: #{self.name})"
+ throw "Regexp matched (#{results.size}) more than fields (#{expr[:fields].length}) defined. (Res: #{results.inspect} Tansform against: #{self.name})"
end
values = {}
Modified: incubator/alois/trunk/prisma/lib/prisma/transform.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/lib/prisma/transform.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/lib/prisma/transform.rb (original)
+++ incubator/alois/trunk/prisma/lib/prisma/transform.rb Tue Nov 30 15:11:46 2010
@@ -27,159 +27,155 @@ module Prisma
source = FileRaw.create(filename,options)
end
-
-
- def Prisma.transform_messages(limit = 100)
-
-# [ 'HUP' , 'INT' , 'TERM', 'USR1', 'USR2'].each {|s|
-# Signal.trap(s) {
-# $log.error("SIGNAL---#{s}") rescue $log.error("aaaaaaaaaasldkfj")
-# }
-# }
-# Signal.trap("TERM") do
-# # End current tranform.
-# $log.error "Caught #{Process.pid} signal TERM." if $log.info?
-# $log.error "Going to stop..." if $log.info?
-# $terminate = true
-# end
- $terminate = false
-
- last_id = 0
- total = 0
- iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
- while iter.length > 0 and not $terminate
- main_cost = Benchmark.measure {
- iter.each { |message|
- $log.info("Transforming message #{message.id}") if $log.info?
- meta = message.parent
- if meta == nil then
- $log.warn("Message has no meta entry.") if $log.warn?
- else
- Prisma.transaction(meta) do
- meta.transform
- end
- end
- last_id = message.id
- }
- }.real
+ def self.transform_messages(limit = 100)
+ # [ 'HUP' , 'INT' , 'TERM', 'USR1', 'USR2'].each {|s|
+ # Signal.trap(s) {
+ # $log.error("SIGNAL---#{s}") rescue $log.error("aaaaaaaaaasldkfj")
+ # }
+ # }
+ # Signal.trap("TERM") do
+ # # End current tranform.
+ # $log.error "Caught #{Process.pid} signal TERM." if $log.info?
+ # $log.error "Going to stop..." if $log.info?
+ # $terminate = true
+ # end
+ $terminate = false
- total = total + iter.length
-
- Prisma.perf {"Done #{limit} in #{main_cost}s (#{limit/ main_cost}/s)."}
- Prisma.perf {"Current message is #{last_id} done #{total}."}
-
+ last_id = 0
+ total = 0
iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
- end
- end
- # TODO: comment transform_queues function
- # transform all available dbsources
- def self.transform_queues(type = :fifo, count = nil, waiting_time=nil)
- if $do_not_run_prisma
- $log.error{ "Will not start primsa queues cause option do_not_run_prisma is defined"}
- return
- end
- pids = []
- for klass in get_classes(:raw)
- $log.warn("Starting queue for class #{klass.name}.")
- pid = fork do
- define_new_logger(klass.name)
- Prisma.reconnect
-
-# Signal.trap("USR1") do
-# # End current transform and begin new transform
-# $log.info "Child #{Process.pid} caught signal USR1" if $log.info?
-# $log.info "Going to restart..." if $log.info?
-# $restart = true
-# $terminate = true
-# end
- Signal.trap("TERM") do
- # End current tranform.
- $log.warn{"Caught #{Process.pid} signal TERM."}
- $log.warn{"Going to stop..."}
- $restart = false
- $terminate = true
- end
-
- source = nil
- $restart = true
- while $restart
- $terminate = false
- $restart = type != :all
- begin
- $log.info "Process last records of class #{klass.name}, #{count} per step" if $log.info?
- if source
- source.finished = true
- source.save
- end
- source = SourceDbMeta.new.prisma_initialize(type, klass, count,nil, false, waiting_time)
- $enable_dublette_recognition = source.may_contain_dublettes
- source.transform
- rescue ActiveRecord::Transactions::TransactionError
- raise $!
- rescue
- $log.error{ "Processing class #{klass.name} threw error #{$!}!" }
- for line in $!.backtrace
- $log.error{"#{line}"}
- end
- if Prisma::Database.check_connections
- $log.fatal{"Connections are good, so something bad happened. Will not risk to restart queue #{klass.name}."}
- $terminate = true
- $restart = false
- else
- $log.info{"At least one prisma connection is down."}
- connection_wait_count = 1
- while not (Prisma::Database.check_connections or $terminate)
- wait_time = connection_wait_count
- wait_time = 30 if wait_time > 30
- $log.warn{"#{connection_wait_count} Waiting #{wait_time} seconds."}
- Prisma.save_sleep(wait_time)
- connection_wait_count += 1
- end
- if !$terminate
- $log.info{"Connection are good again. Restarting queue #{klass.name}."}
- end
- end
- end
- end
- $log.info "Child #{Process.pid} end." if $log.info?
- $log.info "Stopped processing class #{klass.name}." if $log.info?
+ while iter.length > 0 and not $terminate
+ main_cost = Benchmark.measure {
+ iter.each { |message|
+ $log.info("Transforming message #{message.id}") if $log.info?
+ meta = message.parent
+ if meta == nil then
+ $log.warn("Message has no meta entry.") if $log.warn?
+ else
+ Prisma::Database.transaction(meta.class) do
+ meta.transform
+ end
+ end
+ last_id = message.id
+ }
+ }.real
+
+ total = total + iter.length
+
+ Prisma::Util.perf {"Done #{limit} in #{main_cost}s (#{limit/ main_cost}/s)."}
+ Prisma::Util.perf {"Current message is #{last_id} done #{total}."}
+
+ iter = Message.find(:all,:conditions => "ID>#{last_id}" ,:limit => limit,:order=>"id")
end
- pids.push(pid)
end
-# Signal.trap("USR1") do
-# # End current transform and begin new transform
-# $log.info "Caught signal USR1" if $log.info?
-# $log.info "Going to restart prismas." if $log.info?
-# for pid in pids
-# Process.kill("USR1",pid)
-# end
-# end
- Signal.trap("TERM") do
- # End current tranform.
- $log.warn{"Caught signal TERM."}
- $log.warn{"Going to stop prismas."}
- $terminate = true
- $log.error{"No pids found."} if pids.nil? or pids.length == 0
+
+ # TODO: comment transform_queues function
+ # transform all available dbsources
+ def self.transform_queues(type = :fifo, count = nil, waiting_time=nil)
+ if $do_not_run_prisma
+ $log.error{ "Will not start primsa queues cause option do_not_run_prisma is defined"}
+ return
+ end
+ pids = []
+ for klass in get_classes(:raw)
+ $log.warn("Starting queue for class #{klass.name}.")
+ pid = fork do
+ define_new_logger(klass.name)
+ Prisma.reconnect
+
+ # Signal.trap("USR1") do
+ # # End current transform and begin new transform
+ # $log.info "Child #{Process.pid} caught signal USR1" if $log.info?
+ # $log.info "Going to restart..." if $log.info?
+ # $restart = true
+ # $terminate = true
+ # end
+ Signal.trap("TERM") do
+ # End current tranform.
+ $log.warn{"Caught #{Process.pid} signal TERM."}
+ $log.warn{"Going to stop..."}
+ $restart = false
+ $terminate = true
+ end
+
+ source = nil
+ $restart = true
+ while $restart
+ $terminate = false
+ $restart = type != :all
+ begin
+ $log.info "Process last records of class #{klass.name}, #{count} per step" if $log.info?
+ if source
+ source.finished = true
+ source.save
+ end
+ source = SourceDbMeta.new.prisma_initialize(type, klass, count,nil, false, waiting_time)
+ $enable_dublette_recognition = source.may_contain_dublettes
+ source.transform
+ rescue ActiveRecord::Transactions::TransactionError
+ raise $!
+ rescue
+ $log.error{ "Processing class #{klass.name} threw error #{$!}!" }
+ for line in $!.backtrace
+ $log.error{"#{line}"}
+ end
+ if Prisma::Database.check_connections
+ $log.fatal{"Connections are good, so something bad happened. Will not risk to restart queue #{klass.name}."}
+ $terminate = true
+ $restart = false
+ else
+ $log.info{"At least one prisma connection is down."}
+ connection_wait_count = 1
+ while not (Prisma::Database.check_connections or $terminate)
+ wait_time = connection_wait_count
+ wait_time = 30 if wait_time > 30
+ $log.warn{"#{connection_wait_count} Waiting #{wait_time} seconds."}
+ Prisma.save_sleep(wait_time)
+ connection_wait_count += 1
+ end
+ if !$terminate
+ $log.info{"Connection are good again. Restarting queue #{klass.name}."}
+ end
+ end
+ end
+ end
+ $log.info "Child #{Process.pid} end." if $log.info?
+ $log.info "Stopped processing class #{klass.name}." if $log.info?
+ end
+ pids.push(pid)
+ end
+ # Signal.trap("USR1") do
+ # # End current transform and begin new transform
+ # $log.info "Caught signal USR1" if $log.info?
+ # $log.info "Going to restart prismas." if $log.info?
+ # for pid in pids
+ # Process.kill("USR1",pid)
+ # end
+ # end
+ Signal.trap("TERM") do
+ # End current tranform.
+ $log.warn{"Caught signal TERM."}
+ $log.warn{"Going to stop prismas."}
+ $terminate = true
+ $log.error{"No pids found."} if pids.nil? or pids.length == 0
+ for pid in pids
+ $log.warn{"Sending term to #{pid}."}
+ Process.kill("TERM",pid)
+ end
+ end
+
+ $log.debug "Parent process waiting." if $log.debug?
+ while !$terminate
+ sleep 1
+ end
+ $log.warn{"Going to wait for children."}
for pid in pids
- $log.warn{"Sending term to #{pid}."}
- Process.kill("TERM",pid)
+ ret = Process.wait
+ $log.warn{"Child returned with:#{ret}"}
+ $log.error{"Child #{ret} returned without shutdown!"} if not $terminate and not type==:all
end
+ $log.warn{"Prisma main process ended."}
end
-
- $log.debug "Parent process waiting." if $log.debug?
- while !$terminate
- sleep 1
- end
- $log.warn{"Going to wait for children."}
- for pid in pids
- ret = Process.wait
- $log.warn{"Child returned with:#{ret}"}
- $log.error{"Child #{ret} returned without shutdown!"} if not $terminate and not type==:all
- end
- $log.warn{"Prisma main process ended."}
- end
-
-
-
+
end
end
Modified: incubator/alois/trunk/prisma/test/test_helper.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/prisma/test/test_helper.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/prisma/test/test_helper.rb (original)
+++ incubator/alois/trunk/prisma/test/test_helper.rb Tue Nov 30 15:11:46 2010
@@ -23,7 +23,7 @@ require File.expand_path(File.dirname(__
Prisma::Database.load_all
require "active_record/fixtures"
-class PrismaTest < Test::Unit::TestCase
+class Test::Unit::TestCase
def insert_fixture(klass)
fixture_file = Pathname.new(__FILE__).dirname + "fixtures/" + klass.table_name
@@ -34,6 +34,66 @@ class PrismaTest < Test::Unit::TestCase
fix.insert_fixtures
end
+ def file_info(filename)
+ basename = Pathname.new(filename).basename.to_s
+ unless basename =~ /([^\.]*)\.(.*)/
+ g = Dir.glob(filename + ".*").reject {|f| f =~ /\~$/ or f=~/\..*\./}
+ throw "More than one file found. (#{g.inspect})" if g.length > 1
+ throw "File not found '#{filename + ".*"}'" if g.length == 0
+ filename = g[0]
+ end
+ basename = Pathname.new(filename).basename.to_s
+ throw "Malformed filename '#{filename}'" unless basename =~ /([^\.]*)\.(.*)/
+ table_name, type = $1,$2
+ table_class = Prisma::Database.get_class_from_tablename(table_name)
+ throw "Table '#{table_name}' not found." unless table_class
+
+ ret = {:type => type, :table_class => table_class, :filename => filename}
+ end
+
+ def load_and_transform_file(filename, expected_message_count = 0)
+ fi = file_info(filename)
+ Message.delete_all
+ fi[:table_class].delete_all
+
+ ret = load_file(filename)
+ Prisma::Transform.transform_messages
+ if Message.count > 0
+ Message.find(:all).each {|m|
+ p m.msg
+ }
+ print "Found still #{Message.count} messages.\n"
+ end
+ assert_equal expected_message_count, Message.count
+ end
+
+ def load_file(filename)
+ fi = file_info(filename)
+ table_class = fi[:table_class]
+
+ case fi[:type]
+ when "messages"
+ Message.delete_all
+ msgs = []
+ for line in open(fi[:filename])
+ # correct windows linefeeds
+ line = line[0..-3] + "\n" if line.ends_with?("\r\n")
+
+ parent = table_class.new
+ message = Message.new.prisma_initialize(parent,line)
+ message.save
+ msgs.push message
+ end
+ return msgs
+ when "archive"
+ a = ArchiveMeta.new.prisma_initialize(fi[:filename])
+ a.save
+ return a
+ else
+ throw "unknown type '#{type}'."
+ end
+ end
+
end
#require 'test_help'
=begin
@@ -86,65 +146,6 @@ class ActiveSupport::TestCase
source.transform
end
- def file_info(filename)
- basename = Pathname.new(filename).basename.to_s
- unless basename =~ /([^\.]*)\.(.*)/
- g = Dir.glob(filename + ".*").reject {|f| f =~ /\~$/ or f=~/\..*\./}
- throw "More than one file found. (#{g.inspect})" if g.length > 1
- throw "File not found '#{filename + ".*"}'" if g.length == 0
- filename = g[0]
- end
- basename = Pathname.new(filename).basename.to_s
- throw "Malformed filename '#{filename}'" unless basename =~ /([^\.]*)\.(.*)/
- table_name, type = $1,$2
- table_class = Prisma.get_class_from_tablename(table_name)
- throw "Table '#{table_name}' not found." unless table_class
-
- ret = {:type => type, :table_class => table_class, :filename => filename}
- end
-
- def load_file(filename)
- fi = file_info(filename)
- table_class = fi[:table_class]
-
- case fi[:type]
- when "messages"
- Message.delete_all
- msgs = []
- for line in open(fi[:filename])
- # correct windows linefeeds
- line = line[0..-3] + "\n" if line.ends_with?("\r\n")
-
- parent = table_class.new
- message = Message.new.prisma_initialize(parent,line)
- message.save
- msgs.push message
- end
- return msgs
- when "archive"
- a = ArchiveMeta.new.prisma_initialize(fi[:filename])
- a.save
- return a
- else
- throw "unknown type '#{type}'."
- end
- end
-
- def load_and_transform_file(filename, expected_message_count = 0)
- fi = file_info(filename)
- Message.delete_all
- fi[:table_class].delete_all
-
- ret = load_file(filename)
- Prisma.transform_messages
- if Message.count > 0
- Message.find(:all).each {|m|
- p m.msg
- }
- print "Found still #{Message.count} messages.\n"
- end
- assert_equal expected_message_count, Message.count
- end
# from http://wiki.rubyonrails.org/rails/pages/HowtoUseMultipleDatabasesWithFixtures
Modified: incubator/alois/trunk/rails/test/test_helper.rb
URL: http://svn.apache.org/viewvc/incubator/alois/trunk/rails/test/test_helper.rb?rev=1040571&r1=1040570&r2=1040571&view=diff
==============================================================================
--- incubator/alois/trunk/rails/test/test_helper.rb (original)
+++ incubator/alois/trunk/rails/test/test_helper.rb Tue Nov 30 15:11:46 2010
@@ -101,7 +101,7 @@ class ActiveSupport::TestCase
end
return msgs
when "archive"
- a = ArchiveMeta.new(fi[:filename])
+ a = ArchiveMeta.new.prisma_initialize(fi[:filename])
a.save
return a
else